diff --git a/CMakeLists.txt b/CMakeLists.txt index 87f4c080..bcae98fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,7 +50,7 @@ if(CAF_INC_ENABLE_STANDALONE_BUILD) FetchContent_Declare( actor_framework GIT_REPOSITORY https://github.com/actor-framework/actor-framework.git - GIT_TAG 4f8609b + GIT_TAG 963ef5fe ) FetchContent_Populate(actor_framework) set(CAF_ENABLE_EXAMPLES OFF CACHE BOOL "" FORCE) diff --git a/examples/net/web-socket-calculator.cpp b/examples/net/web-socket-calculator.cpp index 390f6d83..5142bea7 100644 --- a/examples/net/web-socket-calculator.cpp +++ b/examples/net/web-socket-calculator.cpp @@ -36,6 +36,7 @@ // asyncio.get_event_loop().run_until_complete(hello()) // ~~~ +#include "caf/actor.hpp" #include "caf/actor_system.hpp" #include "caf/actor_system_config.hpp" #include "caf/byte_span.hpp" @@ -47,6 +48,7 @@ #include "caf/net/socket_manager.hpp" #include "caf/net/tcp_accept_socket.hpp" #include "caf/net/web_socket_server.hpp" +#include "caf/string_view.hpp" #include "caf/tag/mixed_message_oriented.hpp" #include diff --git a/libcaf_net/CMakeLists.txt b/libcaf_net/CMakeLists.txt index 1f71c3c3..a122ce62 100644 --- a/libcaf_net/CMakeLists.txt +++ b/libcaf_net/CMakeLists.txt @@ -4,8 +4,6 @@ file(GLOB_RECURSE CAF_NET_HEADERS "caf/*.hpp") # -- add consistency checks for enum to_string implementations ----------------- -caf_incubator_add_enum_consistency_check("caf/net/basp/connection_state.hpp" - "src/basp/connection_state_strings.cpp") caf_incubator_add_enum_consistency_check("caf/net/basp/ec.hpp" "src/basp/ec_strings.cpp") caf_incubator_add_enum_consistency_check("caf/net/basp/message_type.hpp" @@ -34,13 +32,11 @@ endfunction() # -- add library targets ------------------------------------------------------- add_library(libcaf_net_obj OBJECT ${CAF_NET_HEADERS} - #src/actor_proxy_impl.cpp - #src/basp/application.cpp - #src/endpoint_manager.cpp + src/actor_proxy_impl.cpp #src/net/backend/tcp.cpp - #src/net/backend/test.cpp - #src/net/endpoint_manager_queue.cpp - src/basp/connection_state_strings.cpp + src/net/backend/test.cpp + src/net/consumer_queue.cpp + src/basp/application.cpp src/basp/ec_strings.cpp src/basp/message_type_strings.cpp src/basp/operation_strings.cpp @@ -81,8 +77,8 @@ set_property(TARGET libcaf_net_obj PROPERTY POSITION_INDEPENDENT_CODE ON) caf_net_set_default_properties(libcaf_net_obj libcaf_net) target_include_directories(libcaf_net INTERFACE - $ - $) + $ + $) add_library(CAF::net ALIAS libcaf_net) @@ -126,21 +122,19 @@ target_link_libraries(caf-net-test PRIVATE CAF::test) caf_incubator_add_test_suites(caf-net-test accept_socket - #application + application convert_ip_endpoint datagram_socket detail.rfc6455 #datagram_transport - #doorman - #endpoint_manager header ip multiplexer net.actor_shell #net.backend.tcp - #net.basp.message_queue - #net.basp.ping_pong - #net.basp.worker + net.basp.message_queue + net.basp.ping_pong + net.basp.worker net.length_prefix_framing net.web_socket_server network_socket diff --git a/libcaf_net/caf/net/actor_proxy_impl.hpp b/libcaf_net/caf/net/actor_proxy_impl.hpp index c8efe5f5..4b787159 100644 --- a/libcaf_net/caf/net/actor_proxy_impl.hpp +++ b/libcaf_net/caf/net/actor_proxy_impl.hpp @@ -19,7 +19,7 @@ #pragma once #include "caf/actor_proxy.hpp" -#include "caf/net/endpoint_manager.hpp" +#include "caf/net/fwd.hpp" namespace caf::net { @@ -28,7 +28,7 @@ class actor_proxy_impl : public actor_proxy { public: using super = actor_proxy; - actor_proxy_impl(actor_config& cfg, endpoint_manager_ptr dst); + actor_proxy_impl(actor_config& cfg, basp::application* app); ~actor_proxy_impl() override; @@ -37,7 +37,7 @@ class actor_proxy_impl : public actor_proxy { void kill_proxy(execution_unit* ctx, error rsn) override; private: - endpoint_manager_ptr dst_; + basp::application* app_; }; } // namespace caf::net diff --git a/libcaf_net/caf/net/all.hpp b/libcaf_net/caf/net/all.hpp index a3165b78..cf791a15 100644 --- a/libcaf_net/caf/net/all.hpp +++ b/libcaf_net/caf/net/all.hpp @@ -19,15 +19,13 @@ #pragma once #include "caf/net/actor_proxy_impl.hpp" +#include "caf/net/consumer_queue.hpp" #include "caf/net/datagram_socket.hpp" #include "caf/net/datagram_transport.hpp" #include "caf/net/defaults.hpp" -#include "caf/net/endpoint_manager.hpp" -#include "caf/net/endpoint_manager_queue.hpp" #include "caf/net/fwd.hpp" #include "caf/net/host.hpp" #include "caf/net/ip.hpp" -#include "caf/net/make_endpoint_manager.hpp" #include "caf/net/middleman.hpp" #include "caf/net/middleman_backend.hpp" #include "caf/net/multiplexer.hpp" diff --git a/libcaf_net/caf/net/backend/test.hpp b/libcaf_net/caf/net/backend/test.hpp index adf6adb3..c462c6e4 100644 --- a/libcaf_net/caf/net/backend/test.hpp +++ b/libcaf_net/caf/net/backend/test.hpp @@ -19,23 +19,26 @@ #pragma once #include +#include +#include #include "caf/detail/net_export.hpp" -#include "caf/net/endpoint_manager.hpp" +#include "caf/net/basp/application.hpp" #include "caf/net/fwd.hpp" #include "caf/net/middleman_backend.hpp" +#include "caf/net/socket_manager.hpp" #include "caf/net/stream_socket.hpp" #include "caf/node_id.hpp" namespace caf::net::backend { /// Minimal backend for unit testing. -/// @warning this backend is *not* thread safe. class CAF_NET_EXPORT test : public middleman_backend { public: // -- member types ----------------------------------------------------------- - using peer_entry = std::pair; + using peer_entry + = std::tuple; // -- constructors, destructors, and assignment operators -------------------- @@ -49,9 +52,9 @@ class CAF_NET_EXPORT test : public middleman_backend { void stop() override; - endpoint_manager_ptr peer(const node_id& id) override; + socket_manager_ptr peer(const node_id& id) override; - expected get_or_connect(const uri& locator) override; + expected get_or_connect(const uri& locator) override; void resolve(const uri& locator, const actor& listener) override; @@ -62,7 +65,8 @@ class CAF_NET_EXPORT test : public middleman_backend { // -- properties ------------------------------------------------------------- stream_socket socket(const node_id& peer_id) { - return get_peer(peer_id).first; + auto& entry = get_peer(peer_id); + return std::get(entry); } uint16_t port() const noexcept override; @@ -78,6 +82,8 @@ class CAF_NET_EXPORT test : public middleman_backend { std::map peers_; proxy_registry proxies_; + + std::mutex lock_; }; } // namespace caf::net::backend diff --git a/libcaf_net/caf/net/basp/application.hpp b/libcaf_net/caf/net/basp/application.hpp index c2927c5b..e1b6501f 100644 --- a/libcaf_net/caf/net/basp/application.hpp +++ b/libcaf_net/caf/net/basp/application.hpp @@ -25,9 +25,12 @@ #include #include +#include "caf/actor.hpp" #include "caf/actor_addr.hpp" #include "caf/actor_system.hpp" #include "caf/actor_system_config.hpp" +#include "caf/binary_deserializer.hpp" +#include "caf/binary_serializer.hpp" #include "caf/byte.hpp" #include "caf/byte_span.hpp" #include "caf/callback.hpp" @@ -36,19 +39,23 @@ #include "caf/detail/worker_hub.hpp" #include "caf/error.hpp" #include "caf/fwd.hpp" -#include "caf/net/basp/connection_state.hpp" +#include "caf/mailbox_element.hpp" +#include "caf/net/actor_proxy_impl.hpp" #include "caf/net/basp/constants.hpp" +#include "caf/net/basp/ec.hpp" #include "caf/net/basp/header.hpp" #include "caf/net/basp/message_queue.hpp" #include "caf/net/basp/message_type.hpp" #include "caf/net/basp/worker.hpp" -#include "caf/net/endpoint_manager.hpp" -#include "caf/net/packet_writer.hpp" +#include "caf/net/consumer_queue.hpp" #include "caf/net/receive_policy.hpp" +#include "caf/net/socket_manager.hpp" #include "caf/node_id.hpp" #include "caf/proxy_registry.hpp" #include "caf/response_promise.hpp" #include "caf/scoped_execution_unit.hpp" +#include "caf/send.hpp" +#include "caf/tag/message_oriented.hpp" #include "caf/unit.hpp" namespace caf::net::basp { @@ -58,9 +65,11 @@ class CAF_NET_EXPORT application { public: // -- member types ----------------------------------------------------------- - using hub_type = detail::worker_hub; + using input_tag = tag::message_oriented; + + using byte_span = span; - struct test_tag {}; + using hub_type = detail::worker_hub; // -- constructors, destructors, and assignment operators -------------------- @@ -75,116 +84,436 @@ class CAF_NET_EXPORT application { // -- interface functions ---------------------------------------------------- - template - error init(Parent& parent) { + template + error init(socket_manager* owner, LowerLayerPtr down, const settings& cfg) { // Initialize member variables. - system_ = &parent.system(); + owner_ = owner; + system_ = &owner->system(); executor_.system_ptr(system_); executor_.proxy_registry_ptr(&proxies_); - // Allow unit tests to run the application without endpoint manager. - if constexpr (!std::is_base_of::value) - manager_ = &parent.manager(); - size_t workers; - if (auto workers_cfg = get_if(&system_->config(), - "caf.middleman.workers")) - workers = *workers_cfg; - else - workers = std::min(3u, std::thread::hardware_concurrency() / 4u) + 1; + max_throughput_ = get_or(cfg, "caf.scheduler.max-throughput", + defaults::scheduler::max_throughput); + auto workers = get_or( + cfg, "caf.middleman.workers", + std::min(3u, std::thread::hardware_concurrency() / 4u) + 1); for (size_t i = 0; i < workers; ++i) hub_->add_new_worker(*queue_, proxies_); // Write handshake. - auto hdr = parent.next_header_buffer(); - auto payload = parent.next_payload_buffer(); - if (auto err = generate_handshake(payload)) - return err; - to_bytes(header{message_type::handshake, - static_cast(payload.size()), version}, - hdr); - parent.write_packet(hdr, payload); - parent.transport().configure_read(receive_policy::exactly(header_size)); - return none; + auto app_ids = get_or(cfg, "caf.middleman.app-identifiers", + application::default_app_ids()); + return write_message(down, header{message_type::handshake, version}, + system().node(), app_ids); } - error write_message(packet_writer& writer, - std::unique_ptr ptr); - - template - error handle_data(Parent& parent, byte_span bytes) { - static_assert(std::is_base_of::value, - "parent must implement packet_writer"); - size_t next_read_size = header_size; - if (auto err = handle(next_read_size, parent, bytes)) - return err; - parent.transport().configure_read(receive_policy::exactly(next_read_size)); - return none; + template + bool prepare_send(LowerLayerPtr& down) { + CAF_LOG_TRACE(""); + if (!handshake_complete()) + return true; + if (auto err = dequeue_events(down)) { + CAF_LOG_ERROR("dequeue_events failed: " << CAF_ARG(err)); + down->abort_reason(err); + return false; + } + if (auto err = dequeue_messages(down)) { + CAF_LOG_ERROR("dequeue_messages failed: " << CAF_ARG(err)); + down->abort_reason(err); + return false; + } + return true; } - void resolve(packet_writer& writer, string_view path, const actor& listener); - - static void new_proxy(packet_writer& writer, actor_id id); - - void local_actor_down(packet_writer& writer, actor_id id, error reason); + template + ptrdiff_t consume(LowerLayerPtr& down, byte_span buffer) { + CAF_LOG_TRACE(CAF_ARG2("buffer.size", buffer.size())); + if (auto err = handle(down, buffer)) { + CAF_LOG_ERROR("could not handle message: " << CAF_ARG(err)); + down->abort_reason(err); + return -1; + } + return buffer.size(); + } - template - void timeout(Parent&, const std::string&, uint64_t) { - // nop + template + bool done_sending(LowerLayerPtr&) { + CAF_LOG_TRACE(""); + if (mailbox_.blocked()) + return true; + return (mailbox_.empty() && mailbox_.try_block()); } - void handle_error(sec) { + template + void abort(LowerLayerPtr&, const error&) { + CAF_LOG_TRACE(""); // nop } + void resolve(string_view path, const actor& listener); + + strong_actor_ptr make_proxy(const node_id& nid, const actor_id& aid); + // -- utility functions ------------------------------------------------------ strong_actor_ptr resolve_local_path(string_view path); + /// Writes a message to the message buffer of `down`. + template + error write_message(LowerLayerPtr& down, header hdr, Ts&&... xs) { + CAF_LOG_TRACE(CAF_ARG(hdr)); + down->begin_message(); + auto& buf = down->message_buffer(); + binary_serializer sink{&executor_, buf}; + if (!sink.apply_objects(hdr, xs...)) + return sink.get_error(); + down->end_message(); + return none; + } + // -- properties ------------------------------------------------------------- - connection_state state() const noexcept { - return state_; + bool handshake_complete() const noexcept { + return handshake_complete_; } actor_system& system() const noexcept { return *system_; } + // -- mailbox access --------------------------------------------------------- + + /// Enqueues an event to the mailbox. + template + void enqueue_event(Ts&&... xs) { + enqueue(new consumer_queue::event(std::forward(xs)...)); + } + + /// Enqueues a message to the mailbox. + void enqueue(mailbox_element_ptr msg, strong_actor_ptr receiver); + private: - // -- handling of incoming messages ------------------------------------------ + bool enqueue(consumer_queue::element* ptr); + + consumer_queue::message_ptr next_message() { + if (mailbox_.blocked()) + return nullptr; + mailbox_.fetch_more(); + auto& q = std::get<1>(mailbox_.queue().queues()); + auto ts = q.next_task_size(); + if (ts == 0) + return nullptr; + q.inc_deficit(ts); + auto result = q.next(); + if (mailbox_.empty()) + mailbox_.try_block(); + return result; + } - error handle(size_t& next_read_size, packet_writer& writer, byte_span bytes); + // -- handling of outgoing messages and events ------------------------------- + + template + error dequeue_events(LowerLayerPtr& down) { + CAF_LOG_TRACE(""); + if (!mailbox_.blocked()) { + mailbox_.fetch_more(); + auto& q = std::get<0>(mailbox_.queue().queues()); + do { + q.inc_deficit(q.total_task_size()); + for (auto ptr = q.next(); ptr != nullptr; ptr = q.next()) { + auto f = detail::make_overload( + [&](consumer_queue::event::resolve_request& x) { + write_resolve_request(down, x.path, x.listener); + }, + [&](consumer_queue::event::new_proxy& x) { new_proxy(down, x.id); }, + [&](consumer_queue::event::local_actor_down& x) { + local_actor_down(down, x.id, std::move(x.reason)); + }, + [&](consumer_queue::event::timeout& x) { + timeout(down, std::move(x.type), x.id); + }); + visit(f, ptr->value); + } + } while (!q.empty()); + } + return none; + } - error handle(packet_writer& writer, header hdr, byte_span payload); + template + void write_resolve_request(LowerLayerPtr& down, const std::string& path, + const actor& listener) { + CAF_LOG_TRACE(CAF_ARG(path) << CAF_ARG(listener)); + auto req_id = next_request_id_++; + if (auto err = write_message( + down, header{message_type::resolve_request, req_id}, path)) { + anon_send(listener, resolve_atom_v, err); + return; + } + pending_resolves_.emplace(req_id, listener); + } - error handle_handshake(packet_writer& writer, header hdr, byte_span payload); + template + void new_proxy(LowerLayerPtr& down, actor_id aid) { + CAF_LOG_TRACE(CAF_ARG(aid)); + if (auto err = write_message(down, header{message_type::monitor_message, + static_cast(aid)})) + down->abort_reason(err); + } - error handle_actor_message(packet_writer& writer, header hdr, - byte_span payload); + template + void local_actor_down(LowerLayerPtr& down, actor_id aid, error reason) { + CAF_LOG_TRACE(CAF_ARG(aid) << CAF_ARG(reason)); + if (auto err = write_message( + down, header{message_type::down_message, static_cast(aid)}, + reason)) + down->abort_reason(err); + } - error handle_resolve_request(packet_writer& writer, header rec_hdr, - byte_span received); + template + void timeout(LowerLayerPtr& down, std::string type, uint64_t id) { + CAF_LOG_TRACE(CAF_ARG(type) << CAF_ARG(id)); + down->timeout(std::move(type), id); + } - error handle_resolve_response(packet_writer& writer, header received_hdr, - byte_span received); + template + error dequeue_messages(LowerLayerPtr& down) { + CAF_LOG_TRACE(""); + while (down->can_send_more()) { + auto ptr = next_message(); + if (ptr == nullptr) + break; + CAF_ASSERT(ptr->msg != nullptr); + CAF_LOG_TRACE(CAF_ARG2("content", ptr->msg->content())); + const auto& src = ptr->msg->sender; + const auto& dst = ptr->receiver; + if (dst == nullptr) { + // TODO: valid? + return none; + } + node_id nid{}; + actor_id aid{0}; + if (src != nullptr) { + auto src_id = src->id(); + system().registry().put(src_id, src); + nid = src->node(); + aid = src_id; + } + if (auto err = write_message( + down, + header{message_type::actor_message, ptr->msg->mid.integer_value()}, + nid, aid, dst->id(), ptr->msg->stages, ptr->msg->content())) { + return err; + } + } + return none; + } + + // -- handling of incoming messages ------------------------------------------ + + template + error handle(LowerLayerPtr& down, byte_span bytes) { + CAF_LOG_TRACE(CAF_ARG2("bytes.size", bytes.size())); + if (!handshake_complete_) { + if (bytes.size() < header_size) + return ec::unexpected_number_of_bytes; + auto hdr = header::from_bytes(bytes); + if (hdr.type != message_type::handshake) + return ec::missing_handshake; + if (hdr.operation_data != version) + return ec::version_mismatch; + if (auto err = handle_handshake(down, hdr, bytes.subspan(header_size))) + return err; + handshake_complete_ = true; + return none; + } else { + if (bytes.size() < header_size) + return ec::unexpected_number_of_bytes; + auto hdr = header::from_bytes(bytes); + return handle(down, hdr, bytes.subspan(header_size)); + } + } + + template + error handle(LowerLayerPtr& down, header hdr, byte_span payload) { + CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size())); + switch (hdr.type) { + case message_type::handshake: + return ec::unexpected_handshake; + case message_type::actor_message: + return handle_actor_message(down, hdr, payload); + case message_type::resolve_request: + return handle_resolve_request(down, hdr, payload); + case message_type::resolve_response: + return handle_resolve_response(down, hdr, payload); + case message_type::monitor_message: + return handle_monitor_message(down, hdr, payload); + case message_type::down_message: + return handle_down_message(down, hdr, payload); + case message_type::heartbeat: + return none; + default: + return ec::unimplemented; + } + } + + template + error handle_handshake(LowerLayerPtr&, header hdr, byte_span payload) { + CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size())); + if (hdr.type != message_type::handshake) + return ec::missing_handshake; + if (hdr.operation_data != version) + return ec::version_mismatch; + node_id peer_id; + std::vector app_ids; + binary_deserializer source{&executor_, payload}; + if (!source.apply_objects(peer_id, app_ids)) + return source.get_error(); + if (!peer_id || app_ids.empty()) + return ec::invalid_handshake; + auto ids = get_or(system().config(), "caf.middleman.app-identifiers", + basp::application::default_app_ids()); + auto predicate = [=](const std::string& x) { + return std::find(ids.begin(), ids.end(), x) != ids.end(); + }; + if (std::none_of(app_ids.begin(), app_ids.end(), predicate)) + return ec::app_identifiers_mismatch; + peer_id_ = std::move(peer_id); + return none; + } + + template + error handle_actor_message(LowerLayerPtr&, header hdr, byte_span payload) { + CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size())); + auto worker = hub_->pop(); + if (worker != nullptr) { + CAF_LOG_DEBUG("launch BASP worker for deserializing an actor_message"); + worker->launch(node_id{}, hdr, payload); + } else { + CAF_LOG_DEBUG( + "out of BASP workers, continue deserializing an actor_message"); + // If no worker is available then we have no other choice than to take + // the performance hit and deserialize in this thread. + struct handler : remote_message_handler { + handler(message_queue* queue, proxy_registry* proxies, + actor_system* system, node_id last_hop, basp::header& hdr, + byte_span payload) + : queue_(queue), + proxies_(proxies), + system_(system), + last_hop_(std::move(last_hop)), + hdr_(hdr), + payload_(payload) { + msg_id_ = queue_->new_id(); + } + message_queue* queue_; + proxy_registry* proxies_; + actor_system* system_; + node_id last_hop_; + basp::header& hdr_; + byte_span payload_; + uint64_t msg_id_; + }; + handler f{queue_.get(), &proxies_, system_, node_id{}, hdr, payload}; + f.handle_remote_message(&executor_); + } + return none; + } - error handle_monitor_message(packet_writer& writer, header received_hdr, - byte_span received); + template + error + handle_resolve_request(LowerLayerPtr& down, header hdr, byte_span payload) { + CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size())); + CAF_ASSERT(hdr.type == message_type::resolve_request); + size_t path_size = 0; + binary_deserializer source{&executor_, payload}; + if (!source.begin_sequence(path_size)) + return source.get_error(); + // We expect the received buffer to contain the path only. + if (path_size != source.remaining()) + return ec::invalid_payload; + auto remainder = source.remainder(); + string_view path{reinterpret_cast(remainder.data()), + remainder.size()}; + // Write result. + auto result = resolve_local_path(path); + actor_id aid; + std::set ifs; + if (result) { + aid = result->id(); + system().registry().put(aid, result); + } else { + aid = 0; + } + // TODO: figure out how to obtain messaging interface. + return write_message( + down, header{message_type::resolve_response, hdr.operation_data}, aid, + ifs); + } - error handle_down_message(packet_writer& writer, header received_hdr, - byte_span received); + template + error handle_resolve_response(LowerLayerPtr&, header hdr, byte_span payload) { + CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size())); + CAF_ASSERT(hdr.type == message_type::resolve_response); + auto i = pending_resolves_.find(hdr.operation_data); + if (i == pending_resolves_.end()) { + CAF_LOG_ERROR("received unknown ID in resolve_response message"); + return none; + } + auto guard = detail::make_scope_guard([&] { pending_resolves_.erase(i); }); + actor_id aid; + std::set ifs; + binary_deserializer source{&executor_, payload}; + if (!source.apply_objects(aid, ifs)) { + anon_send(i->second, sec::remote_lookup_failed); + return source.get_error(); + } + if (aid == 0) { + anon_send(i->second, strong_actor_ptr{nullptr}, std::move(ifs)); + return none; + } + anon_send(i->second, proxies_.get_or_put(peer_id_, aid), std::move(ifs)); + return none; + } - /// Writes the handshake payload to `buf_`. - error generate_handshake(byte_buffer& buf); + template + error + handle_monitor_message(LowerLayerPtr& down, header hdr, byte_span payload) { + CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size())); + if (!payload.empty()) + return ec::unexpected_payload; + auto aid = static_cast(hdr.operation_data); + auto hdl = system().registry().get(aid); + if (hdl != nullptr) { + hdl->get()->attach_functor([this, aid](error reason) mutable { + this->enqueue_event(aid, std::move(reason)); + }); + } else { + error reason = exit_reason::unknown; + return write_message( + down, header{message_type::down_message, hdr.operation_data}, reason); + } + return none; + } + + template + error handle_down_message(LowerLayerPtr&, header hdr, byte_span payload) { + CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size())); + error reason; + binary_deserializer source{&executor_, payload}; + if (!source.apply_objects(reason)) + return source.get_error(); + proxies_.erase(peer_id_, hdr.operation_data, std::move(reason)); + return none; + } // -- member variables ------------------------------------------------------- + // Stores incoming actor messages. + consumer_queue::type mailbox_; + /// Stores a pointer to the parent actor system. actor_system* system_ = nullptr; - /// Stores the expected type of the next incoming message. - connection_state state_ = connection_state::await_handshake_header; - - /// Caches the last header while waiting for the matching payload. - header hdr_; + /// Stores whether the BASP handshake completed successfully. + bool handshake_complete_ = false; /// Stores the ID of our peer. node_id peer_id_; @@ -201,8 +530,13 @@ class CAF_NET_EXPORT application { /// Points to the factory object for generating proxies. proxy_registry& proxies_; - /// Points to the endpoint manager that owns this applications. - endpoint_manager* manager_ = nullptr; + /// Points to the socket manager that owns this applications. + socket_manager* owner_ = nullptr; + + // Guards access to owner_. + std::mutex owner_mtx_; + + size_t max_throughput_ = 0; /// Provides pointers to the actor system as well as the registry, /// serializers and deserializer. diff --git a/libcaf_net/caf/net/basp/connection_state.hpp b/libcaf_net/caf/net/basp/connection_state.hpp deleted file mode 100644 index baee1118..00000000 --- a/libcaf_net/caf/net/basp/connection_state.hpp +++ /dev/null @@ -1,49 +0,0 @@ -/****************************************************************************** - * ____ _ _____ * - * / ___| / \ | ___| C++ * - * | | / _ \ | |_ Actor * - * | |___ / ___ \| _| Framework * - * \____/_/ \_|_| * - * * - * Copyright 2011-2019 Dominik Charousset * - * * - * Distributed under the terms and conditions of the BSD 3-Clause License or * - * (at your option) under the terms and conditions of the Boost Software * - * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * - * * - * If you did not receive a copy of the license files, see * - * http://opensource.org/licenses/BSD-3-Clause and * - * http://www.boost.org/LICENSE_1_0.txt. * - ******************************************************************************/ - -#pragma once - -#include - -namespace caf::net::basp { - -/// @addtogroup BASP - -/// Stores the state of a connection in a `basp::application`. -enum class connection_state { - /// Initial state for any connection to wait for the peer's handshake. - await_handshake_header, - /// Indicates that the header for the peer's handshake arrived and BASP - /// requires the payload next. - await_handshake_payload, - /// Indicates that a connection is established and this node is waiting for - /// the next BASP header. - await_header, - /// Indicates that this node has received a header with non-zero payload and - /// is waiting for the data. - await_payload, - /// Indicates that the connection is about to shut down. - shutdown, -}; - -/// @relates connection_state -std::string to_string(connection_state x); - -/// @} - -} // namespace caf::net::basp diff --git a/libcaf_net/caf/net/basp/constants.hpp b/libcaf_net/caf/net/basp/constants.hpp index cac0972a..7afe4a7f 100644 --- a/libcaf_net/caf/net/basp/constants.hpp +++ b/libcaf_net/caf/net/basp/constants.hpp @@ -30,7 +30,7 @@ namespace caf::net::basp { constexpr uint64_t version = 1; /// Size of a BASP header in serialized form. -constexpr size_t header_size = 13; +constexpr size_t header_size = 9; /// @} diff --git a/libcaf_net/caf/net/basp/header.hpp b/libcaf_net/caf/net/basp/header.hpp index 7fb7c3e1..5fbbb734 100644 --- a/libcaf_net/caf/net/basp/header.hpp +++ b/libcaf_net/caf/net/basp/header.hpp @@ -38,13 +38,12 @@ struct CAF_NET_EXPORT header : detail::comparable
{ // -- constructors, destructors, and assignment operators -------------------- constexpr header() noexcept - : type(message_type::handshake), payload_len(0), operation_data(0) { + : type(message_type::handshake), operation_data(0) { // nop } - constexpr header(message_type type, uint32_t payload_len, - uint64_t operation_data) noexcept - : type(type), payload_len(payload_len), operation_data(operation_data) { + constexpr header(message_type type, uint64_t operation_data) noexcept + : type(type), operation_data(operation_data) { // nop } @@ -66,9 +65,6 @@ struct CAF_NET_EXPORT header : detail::comparable
{ /// Denotes the BASP operation and how `operation_data` gets interpreted. message_type type; - /// Stores the size in bytes for the payload that follows this header. - uint32_t payload_len; - /// Stores type-specific information such as the BASP version in handshakes. uint64_t operation_data; }; @@ -85,7 +81,6 @@ CAF_NET_EXPORT void to_bytes(header x, byte_buffer& buf); template bool inspect(Inspector& f, header& x) { return f.object(x).fields(f.field("type", x.type), - f.field("payload_len", x.payload_len), f.field("operation_data", x.operation_data)); } diff --git a/libcaf_net/caf/net/connection_acceptor.hpp b/libcaf_net/caf/net/connection_acceptor.hpp index 27106d19..fe4de0f6 100644 --- a/libcaf_net/caf/net/connection_acceptor.hpp +++ b/libcaf_net/caf/net/connection_acceptor.hpp @@ -19,8 +19,8 @@ #pragma once #include "caf/logger.hpp" -#include "caf/net/make_endpoint_manager.hpp" #include "caf/net/socket.hpp" +#include "caf/net/socket_manager.hpp" #include "caf/net/stream_socket.hpp" #include "caf/net/stream_transport.hpp" #include "caf/net/tcp_accept_socket.hpp" diff --git a/libcaf_net/caf/net/endpoint_manager_queue.hpp b/libcaf_net/caf/net/consumer_queue.hpp similarity index 94% rename from libcaf_net/caf/net/endpoint_manager_queue.hpp rename to libcaf_net/caf/net/consumer_queue.hpp index 4cf1692d..9a11c7b6 100644 --- a/libcaf_net/caf/net/endpoint_manager_queue.hpp +++ b/libcaf_net/caf/net/consumer_queue.hpp @@ -34,7 +34,7 @@ namespace caf::net { -class CAF_NET_EXPORT endpoint_manager_queue { +class CAF_NET_EXPORT consumer_queue { public: enum class element_type { event, message }; @@ -61,17 +61,15 @@ class CAF_NET_EXPORT endpoint_manager_queue { class event final : public element { public: struct resolve_request { - uri locator; + std::string path; actor listener; }; struct new_proxy { - node_id peer; actor_id id; }; struct local_actor_down { - node_id observing_peer; actor_id id; error reason; }; @@ -81,11 +79,11 @@ class CAF_NET_EXPORT endpoint_manager_queue { uint64_t id; }; - event(uri locator, actor listener); + event(std::string path, actor listener); - event(node_id peer, actor_id proxy_id); + event(actor_id proxy_id); - event(node_id observing_peer, actor_id local_actor_id, error reason); + event(actor_id local_actor_id, error reason); event(std::string type, uint64_t id); diff --git a/libcaf_net/caf/net/endpoint_manager.hpp b/libcaf_net/caf/net/endpoint_manager.hpp deleted file mode 100644 index 270a3c52..00000000 --- a/libcaf_net/caf/net/endpoint_manager.hpp +++ /dev/null @@ -1,101 +0,0 @@ -/****************************************************************************** - * ____ _ _____ * - * / ___| / \ | ___| C++ * - * | | / _ \ | |_ Actor * - * | |___ / ___ \| _| Framework * - * \____/_/ \_|_| * - * * - * Copyright 2011-2019 Dominik Charousset * - * * - * Distributed under the terms and conditions of the BSD 3-Clause License or * - * (at your option) under the terms and conditions of the Boost Software * - * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * - * * - * If you did not receive a copy of the license files, see * - * http://opensource.org/licenses/BSD-3-Clause and * - * http://www.boost.org/LICENSE_1_0.txt. * - ******************************************************************************/ - -#pragma once - -#include -#include -#include - -#include "caf/actor.hpp" -#include "caf/actor_clock.hpp" -#include "caf/detail/net_export.hpp" -#include "caf/fwd.hpp" -#include "caf/intrusive/drr_queue.hpp" -#include "caf/intrusive/fifo_inbox.hpp" -#include "caf/intrusive/singly_linked.hpp" -#include "caf/mailbox_element.hpp" -#include "caf/net/endpoint_manager_queue.hpp" -#include "caf/net/socket_manager.hpp" -#include "caf/variant.hpp" - -namespace caf::net { - -/// Manages a communication endpoint. -class CAF_NET_EXPORT endpoint_manager : public socket_manager { -public: - // -- member types ----------------------------------------------------------- - - using super = socket_manager; - - // -- constructors, destructors, and assignment operators -------------------- - - endpoint_manager(socket handle, const multiplexer_ptr& parent, - actor_system& sys); - - ~endpoint_manager() override; - - // -- properties ------------------------------------------------------------- - - actor_system& system() noexcept { - return sys_; - } - - const actor_system_config& config() const noexcept; - - // -- queue access ----------------------------------------------------------- - - bool at_end_of_message_queue(); - - endpoint_manager_queue::message_ptr next_message(); - - // -- event management ------------------------------------------------------- - - /// Resolves a path to a remote actor. - void resolve(uri locator, actor listener); - - /// Enqueues a message to the endpoint. - void enqueue(mailbox_element_ptr msg, strong_actor_ptr receiver); - - /// Enqueues an event to the endpoint. - template - void enqueue_event(Ts&&... xs) { - enqueue(new endpoint_manager_queue::event(std::forward(xs)...)); - } - - // -- pure virtual member functions ------------------------------------------ - - /// Initializes the manager before adding it to the multiplexer's event loop. - // virtual error init() = 0; - -protected: - bool enqueue(endpoint_manager_queue::element* ptr); - - /// Points to the hosting actor system. - actor_system& sys_; - - /// Stores control events and outbound messages. - endpoint_manager_queue::type queue_; - - /// Stores a proxy for interacting with the actor clock. - actor timeout_proxy_; -}; - -using endpoint_manager_ptr = intrusive_ptr; - -} // namespace caf::net diff --git a/libcaf_net/caf/net/endpoint_manager_impl.hpp b/libcaf_net/caf/net/endpoint_manager_impl.hpp deleted file mode 100644 index cd8b2268..00000000 --- a/libcaf_net/caf/net/endpoint_manager_impl.hpp +++ /dev/null @@ -1,131 +0,0 @@ -/****************************************************************************** - * ____ _ _____ * - * / ___| / \ | ___| C++ * - * | | / _ \ | |_ Actor * - * | |___ / ___ \| _| Framework * - * \____/_/ \_|_| * - * * - * Copyright 2011-2019 Dominik Charousset * - * * - * Distributed under the terms and conditions of the BSD 3-Clause License or * - * (at your option) under the terms and conditions of the Boost Software * - * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * - * * - * If you did not receive a copy of the license files, see * - * http://opensource.org/licenses/BSD-3-Clause and * - * http://www.boost.org/LICENSE_1_0.txt. * - ******************************************************************************/ - -#pragma once - -#include "caf/abstract_actor.hpp" -#include "caf/actor_cast.hpp" -#include "caf/actor_system.hpp" -#include "caf/detail/overload.hpp" -#include "caf/net/endpoint_manager.hpp" - -namespace caf::net { - -template -class endpoint_manager_impl : public endpoint_manager { -public: - // -- member types ----------------------------------------------------------- - - using super = endpoint_manager; - - using transport_type = Transport; - - using application_type = typename transport_type::application_type; - - // -- constructors, destructors, and assignment operators -------------------- - - endpoint_manager_impl(const multiplexer_ptr& parent, actor_system& sys, - socket handle, Transport trans) - : super(handle, parent, sys), transport_(std::move(trans)) { - // nop - } - - ~endpoint_manager_impl() override { - // nop - } - - // -- properties ------------------------------------------------------------- - - transport_type& transport() { - return transport_; - } - - endpoint_manager_impl& manager() { - return *this; - } - - // -- timeout management ----------------------------------------------------- - - template - uint64_t - set_timeout(actor_clock::time_point tp, std::string type, Ts&&... xs) { - auto act = actor_cast(timeout_proxy_); - CAF_ASSERT(act != nullptr); - sys_.clock().set_multi_timeout(tp, act, std::move(type), next_timeout_id_); - transport_.set_timeout(next_timeout_id_, std::forward(xs)...); - return next_timeout_id_++; - } - - // -- interface functions ---------------------------------------------------- - - error init() /*override*/ { - this->register_reading(); - return transport_.init(*this); - } - - bool handle_read_event() override { - return transport_.handle_read_event(*this); - } - - bool handle_write_event() override { - if (!this->queue_.blocked()) { - this->queue_.fetch_more(); - auto& q = std::get<0>(this->queue_.queue().queues()); - do { - q.inc_deficit(q.total_task_size()); - for (auto ptr = q.next(); ptr != nullptr; ptr = q.next()) { - auto f = detail::make_overload( - [&](endpoint_manager_queue::event::resolve_request& x) { - transport_.resolve(*this, x.locator, x.listener); - }, - [&](endpoint_manager_queue::event::new_proxy& x) { - transport_.new_proxy(*this, x.peer, x.id); - }, - [&](endpoint_manager_queue::event::local_actor_down& x) { - transport_.local_actor_down(*this, x.observing_peer, x.id, - std::move(x.reason)); - }, - [&](endpoint_manager_queue::event::timeout& x) { - transport_.timeout(*this, x.type, x.id); - }); - visit(f, ptr->value); - } - } while (!q.empty()); - } - if (!transport_.handle_write_event(*this)) { - if (this->queue_.blocked()) - return false; - return !(this->queue_.empty() && this->queue_.try_block()); - } - return true; - } - - void handle_error(sec code) override { - transport_.handle_error(code); - } - -private: - transport_type transport_; - - /// Stores the id for the next timeout. - uint64_t next_timeout_id_; - - error err_; -}; - -} // namespace caf::net diff --git a/libcaf_net/caf/net/fwd.hpp b/libcaf_net/caf/net/fwd.hpp index 375e27c9..ca6ec246 100644 --- a/libcaf_net/caf/net/fwd.hpp +++ b/libcaf_net/caf/net/fwd.hpp @@ -73,6 +73,8 @@ using weak_multiplexer_ptr = std::weak_ptr; namespace caf::net::basp { +class application; + enum class ec : uint8_t; } // namespace caf::net::basp diff --git a/libcaf_net/caf/net/length_prefix_framing.hpp b/libcaf_net/caf/net/length_prefix_framing.hpp index 877d8204..b7ce8dd2 100644 --- a/libcaf_net/caf/net/length_prefix_framing.hpp +++ b/libcaf_net/caf/net/length_prefix_framing.hpp @@ -20,13 +20,16 @@ #include #include +#include #include #include "caf/byte.hpp" #include "caf/byte_span.hpp" #include "caf/detail/network_order.hpp" #include "caf/error.hpp" +#include "caf/net/message_oriented_layer_ptr.hpp" #include "caf/net/receive_policy.hpp" +#include "caf/net/socket_manager.hpp" #include "caf/sec.hpp" #include "caf/span.hpp" #include "caf/tag/message_oriented.hpp" @@ -48,63 +51,85 @@ class length_prefix_framing { using length_prefix_type = uint32_t; + static constexpr size_t header_length = sizeof(length_prefix_type); + static constexpr size_t max_message_length = INT32_MAX; - // -- interface for the upper layer ------------------------------------------ + // -- constructors, destructors, and assignment operators -------------------- - template - class access { - public: - access(LowerLayer* lower_layer, length_prefix_framing* this_layer) - : lower_layer_(lower_layer), this_layer_(this_layer) { - // nop - } + template + length_prefix_framing(Ts&&... xs) : upper_layer_(std::forward(xs)...) { + // nop + } - void begin_message() { - lower_layer_->begin_output(); - auto& buf = message_buffer(); - message_offset_ = buf.size(); - buf.insert(buf.end(), 4, byte{0}); - } + ~length_prefix_framing() = default; - byte_buffer& message_buffer() { - return lower_layer_->output_buffer(); - } + // -- initialization --------------------------------------------------------- - bool end_message() { - using detail::to_network_order; - auto& buf = message_buffer(); - auto msg_begin = buf.begin() + message_offset_; - auto msg_size = std::distance(msg_begin + 4, buf.end()); - if (msg_size > 0 && msg_size < max_message_length) { - auto u32_size = to_network_order(static_cast(msg_size)); - memcpy(std::addressof(*msg_begin), &u32_size, 4); - return true; - } else { - abort_reason(make_error( - sec::runtime_error, msg_size == 0 ? "logic error: message of size 0" - : "maximum message size exceeded")); - return false; - } - } + template + error + init(socket_manager* owner, LowerLayerPtr& down, const settings& config) { + CAF_LOG_TRACE(""); + down->configure_read(receive_policy::exactly(header_length)); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, down); + return upper_layer_.init(owner, this_layer_ptr, config); + } - bool can_send_more() const noexcept { - return lower_layer_->can_send_more(); - } + // -- interface for the upper layer ------------------------------------------ - void abort_reason(error reason) { - return lower_layer_->abort_reason(std::move(reason)); - } + template + void begin_message(LowerLayerPtr& down) { + CAF_LOG_TRACE(""); + down->begin_output(); + auto& buf = down->output_buffer(); + message_offset_ = buf.size(); + buf.insert(buf.end(), 4, byte{0}); + } + + template + byte_buffer& message_buffer(LowerLayerPtr& down) { + return down->output_buffer(); + } - void configure_read(receive_policy policy) { - lower_layer_->configure_read(policy); + template + void end_message(LowerLayerPtr& down) { + CAF_LOG_TRACE(""); + using detail::to_network_order; + auto& buf = down->output_buffer(); + auto msg_begin = buf.begin() + message_offset_; + auto msg_size = std::distance(msg_begin + header_length, buf.end()); + if (msg_size > 0 && static_cast(msg_size) < max_message_length) { + auto u32_size = to_network_order(static_cast(msg_size)); + memcpy(std::addressof(*msg_begin), &u32_size, 4); + down->end_output(); + } else { + auto err = make_error(sec::runtime_error, + msg_size == 0 ? "logic error: message of size 0" + : "maximum message size exceeded"); + CAF_LOG_ERROR(err); + down->abort_reason(err); } + } + + template + bool can_send_more(LowerLayerPtr& down) const noexcept { + return down->can_send_more(); + } - private: - LowerLayer* lower_layer_; - length_prefix_framing* this_layer_; - size_t message_offset_ = 0; - }; + template + void abort_reason(LowerLayerPtr& down, error reason) { + return down->abort_reason(std::move(reason)); + } + + template + void configure_read(LowerLayerPtr&, receive_policy) { + // nop + } + + template + void timeout(LowerLayerPtr& down, std::string type, uint64_t id) { + down->timeout(std::move(type), id); + } // -- properties ------------------------------------------------------------- @@ -115,42 +140,61 @@ class length_prefix_framing { const auto& upper_layer() const noexcept { return upper_layer_; } + // -- role: upper layer ------------------------------------------------------ - template - bool prepare_send(LowerLayer& down) { - access this_layer{&down, this}; - return upper_layer_.prepare_send(this_layer); + template + bool prepare_send(LowerLayerPtr& down) { + auto this_layer_ptr = make_message_oriented_layer_ptr(this, down); + return upper_layer_.prepare_send(this_layer_ptr); } - template - bool done_sending(LowerLayer& down) { - access this_layer{&down, this}; - return upper_layer_.done_sending(this_layer); + template + bool done_sending(LowerLayerPtr& down) { + auto this_layer_ptr = make_message_oriented_layer_ptr(this, down); + return upper_layer_.done_sending(this_layer_ptr); } - template - void abort(LowerLayer& down, const error& reason) { - access this_layer{&down, this}; - return upper_layer_.abort(this_layer, reason); + template + void abort(LowerLayerPtr& down, const error& reason) { + auto this_layer_ptr = make_message_oriented_layer_ptr(this, down); + upper_layer_.abort(this_layer_ptr, reason); } - template - ptrdiff_t consume(LowerLayer& down, byte_span buffer, byte_span) { - using detail::from_network_order; - if (buffer.size() < 4) - return 0; - uint32_t u32_size = 0; - memcpy(&u32_size, buffer.data(), 4); - auto msg_size = static_cast(from_network_order(u32_size)); - if (buffer.size() < msg_size + 4) - return 0; - upper_layer_.consume(down, make_span(buffer.data() + 4, msg_size)); - return msg_size + 4; + template + ptrdiff_t consume(LowerLayerPtr& down, byte_span buffer, byte_span) { + CAF_LOG_TRACE(CAF_ARG2("buffer.size", buffer.size())); + if (awaiting_header_) { + using detail::from_network_order; + CAF_ASSERT(buffer.size() == header_length); + uint32_t u32_size = 0; + memcpy(&u32_size, buffer.data(), header_length); + msg_size_ = static_cast(from_network_order(u32_size)); + down->configure_read(receive_policy::exactly(msg_size_)); + awaiting_header_ = false; + return header_length; + } else { + CAF_ASSERT(buffer.size() == msg_size_); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, down); + upper_layer_.consume(this_layer_ptr, buffer); + down->configure_read(receive_policy::exactly(header_length)); + awaiting_header_ = true; + return msg_size_; + } } private: + /// Holds the upper layer. UpperLayer upper_layer_; + + /// Holds the offset within the message buffer for writing the header. + size_t message_offset_ = 0; + + /// Holds the size of the next message. + size_t msg_size_ = 0; + + /// Signals wether a header or payload is expected with the next `consume`. + bool awaiting_header_ = true; }; } // namespace caf::net diff --git a/libcaf_net/caf/net/make_endpoint_manager.hpp b/libcaf_net/caf/net/make_endpoint_manager.hpp deleted file mode 100644 index 66dba0de..00000000 --- a/libcaf_net/caf/net/make_endpoint_manager.hpp +++ /dev/null @@ -1,35 +0,0 @@ -/****************************************************************************** - * ____ _ _____ * - * / ___| / \ | ___| C++ * - * | | / _ \ | |_ Actor * - * | |___ / ___ \| _| Framework * - * \____/_/ \_|_| * - * * - * Copyright 2011-2019 Dominik Charousset * - * * - * Distributed under the terms and conditions of the BSD 3-Clause License or * - * (at your option) under the terms and conditions of the Boost Software * - * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * - * * - * If you did not receive a copy of the license files, see * - * http://opensource.org/licenses/BSD-3-Clause and * - * http://www.boost.org/LICENSE_1_0.txt. * - ******************************************************************************/ - -#pragma once - -#include "caf/detail/net_export.hpp" -#include "caf/make_counted.hpp" -#include "caf/net/endpoint_manager.hpp" -#include "caf/net/endpoint_manager_impl.hpp" - -namespace caf::net { - -template -endpoint_manager_ptr make_endpoint_manager(const multiplexer_ptr& mpx, - actor_system& sys, Transport trans) { - using impl = endpoint_manager_impl; - return make_counted(mpx, sys, std::move(trans)); -} - -} // namespace caf::net diff --git a/libcaf_net/caf/net/message_oriented_layer_ptr.hpp b/libcaf_net/caf/net/message_oriented_layer_ptr.hpp new file mode 100644 index 00000000..27a58d70 --- /dev/null +++ b/libcaf_net/caf/net/message_oriented_layer_ptr.hpp @@ -0,0 +1,105 @@ +/****************************************************************************** + * ____ _ _____ * + * / ___| / \ | ___| C++ * + * | | / _ \ | |_ Actor * + * | |___ / ___ \| _| Framework * + * \____/_/ \_|_| * + * * + * Copyright 2011-2020 Dominik Charousset * + * * + * Distributed under the terms and conditions of the BSD 3-Clause License or * + * (at your option) under the terms and conditions of the Boost Software * + * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * + * * + * If you did not receive a copy of the license files, see * + * http://opensource.org/licenses/BSD-3-Clause and * + * http://www.boost.org/LICENSE_1_0.txt. * + ******************************************************************************/ + +#pragma once + +#include "caf/error.hpp" +#include "caf/fwd.hpp" +#include "caf/net/fwd.hpp" +#include "caf/net/receive_policy.hpp" + +namespace caf::net { + +/// Wraps a pointer to a message-oriented layer with a pointer to its lower +/// layer. Both pointers are then used to implement the interface required for a +/// message-oriented layer when calling into its upper layer. +template +class message_oriented_layer_ptr { +public: + class access { + public: + access(Layer* layer, LowerLayerPtr down) : lptr_(layer), llptr_(down) { + // nop + } + + bool can_send_more() const noexcept { + return lptr_->can_send_more(llptr_); + } + + void begin_message() { + lptr_->begin_message(llptr_); + } + + byte_buffer& message_buffer() { + return lptr_->message_buffer(llptr_); + } + + void end_message() { + lptr_->end_message(llptr_); + } + + void abort_reason(error reason) { + return lptr_->abort_reason(llptr_, std::move(reason)); + } + + const error& abort_reason() { + return lptr_->abort_reason(llptr_); + } + + void configure_read(receive_policy policy) { + lptr_->configure_read(llptr_, policy); + } + + void timeout(std::string type, uint64_t id) { + lptr_->timeout(llptr_, std::move(type), id); + } + + private: + Layer* lptr_; + LowerLayerPtr llptr_; + }; + + message_oriented_layer_ptr(Layer* layer, LowerLayerPtr down) + : access_(layer, down) { + // nop + } + + message_oriented_layer_ptr(const message_oriented_layer_ptr&) = default; + + explicit operator bool() const noexcept { + return true; + } + + access* operator->() const noexcept { + return &access_; + } + + access& operator*() const noexcept { + return access_; + } + +private: + mutable access access_; +}; + +template +auto make_message_oriented_layer_ptr(Layer* this_layer, LowerLayerPtr down) { + return message_oriented_layer_ptr{this_layer, down}; +} + +} // namespace caf::net diff --git a/libcaf_net/caf/net/middleman.hpp b/libcaf_net/caf/net/middleman.hpp index 980c83d6..ea0ac3d9 100644 --- a/libcaf_net/caf/net/middleman.hpp +++ b/libcaf_net/caf/net/middleman.hpp @@ -107,7 +107,7 @@ class CAF_NET_EXPORT middleman : public actor_system::module { // -- remoting --------------------------------------------------------------- - expected connect(const uri& locator); + expected connect(const uri& locator); // Publishes an actor. template diff --git a/libcaf_net/caf/net/middleman_backend.hpp b/libcaf_net/caf/net/middleman_backend.hpp index 5c93d9ef..547dfd0f 100644 --- a/libcaf_net/caf/net/middleman_backend.hpp +++ b/libcaf_net/caf/net/middleman_backend.hpp @@ -43,11 +43,11 @@ class CAF_NET_EXPORT middleman_backend : public proxy_registry::backend { /// Initializes the backend. virtual error init() = 0; - /// @returns The endpoint manager for `peer` on success, `nullptr` otherwise. - virtual endpoint_manager_ptr peer(const node_id& id) = 0; + /// @returns The socket manager for `peer` on success, `nullptr` otherwise. + virtual socket_manager_ptr peer(const node_id& id) = 0; /// Establishes a connection to a remote node. - virtual expected get_or_connect(const uri& locator) = 0; + virtual expected get_or_connect(const uri& locator) = 0; /// Resolves a path to a remote actor. virtual void resolve(const uri& locator, const actor& listener) = 0; diff --git a/libcaf_net/caf/net/socket_manager.hpp b/libcaf_net/caf/net/socket_manager.hpp index 18bfdc84..846a88e1 100644 --- a/libcaf_net/caf/net/socket_manager.hpp +++ b/libcaf_net/caf/net/socket_manager.hpp @@ -25,6 +25,7 @@ #include "caf/make_counted.hpp" #include "caf/net/actor_shell.hpp" #include "caf/net/fwd.hpp" +#include "caf/net/multiplexer.hpp" #include "caf/net/operation.hpp" #include "caf/net/socket.hpp" #include "caf/ref_counted.hpp" @@ -78,6 +79,10 @@ class CAF_NET_EXPORT socket_manager : public ref_counted { return parent_; } + actor_system& system() noexcept { + return mpx().system(); + } + /// Returns registered operations (read, write, or both). operation mask() const noexcept { return mask_; @@ -204,7 +209,6 @@ class socket_manager_impl : public socket_manager { bool handle_write_event() override { return protocol_.handle_write_event(this); } - void handle_error(sec code) override { abort_reason_ = code; return protocol_.abort(this, abort_reason_); diff --git a/libcaf_net/caf/net/stream_oriented_layer_ptr.hpp b/libcaf_net/caf/net/stream_oriented_layer_ptr.hpp index 0d24a3f0..57270376 100644 --- a/libcaf_net/caf/net/stream_oriented_layer_ptr.hpp +++ b/libcaf_net/caf/net/stream_oriented_layer_ptr.hpp @@ -69,6 +69,10 @@ class stream_oriented_layer_ptr { lptr_->configure_read(llptr_, policy); } + void timeout(std::string type, uint64_t id) { + lptr_->timeout(llptr_, std::move(type), id); + } + private: Layer* lptr_; LowerLayerPtr llptr_; diff --git a/libcaf_net/caf/net/stream_transport.hpp b/libcaf_net/caf/net/stream_transport.hpp index c18d35d1..862ef613 100644 --- a/libcaf_net/caf/net/stream_transport.hpp +++ b/libcaf_net/caf/net/stream_transport.hpp @@ -26,6 +26,7 @@ #include "caf/logger.hpp" #include "caf/net/fwd.hpp" #include "caf/net/receive_policy.hpp" +#include "caf/net/socket_manager.hpp" #include "caf/net/stream_oriented_layer_ptr.hpp" #include "caf/net/stream_socket.hpp" #include "caf/sec.hpp" @@ -56,9 +57,7 @@ class stream_transport { // nop } - virtual ~stream_transport() { - // nop - } + ~stream_transport() = default; // -- interface for stream_oriented_layer_ptr -------------------------------- @@ -106,6 +105,11 @@ class stream_transport { max_read_size_ = policy.max_size; } + template + void timeout(ParentPtr&, std::string, uint64_t) { + // nop + } + // -- properties ------------------------------------------------------------- auto& read_buffer() noexcept { @@ -155,6 +159,7 @@ class stream_transport { CAF_LOG_ERROR("send_buffer_size: " << socket_buf_size.error()); return std::move(socket_buf_size.error()); } + owner->register_reading(); auto this_layer_ptr = make_stream_oriented_layer_ptr(this, parent); return upper_layer_.init(owner, this_layer_ptr, config); } diff --git a/libcaf_net/caf/net/transport_worker.hpp b/libcaf_net/caf/net/transport_worker.hpp index 9ecaf51a..cd7254bf 100644 --- a/libcaf_net/caf/net/transport_worker.hpp +++ b/libcaf_net/caf/net/transport_worker.hpp @@ -19,7 +19,7 @@ #pragma once #include "caf/logger.hpp" -#include "caf/net/endpoint_manager_queue.hpp" +#include "caf/net/consumer_queue.hpp" #include "caf/net/fwd.hpp" #include "caf/net/packet_writer_decorator.hpp" #include "caf/unit.hpp" @@ -73,8 +73,8 @@ class transport_worker { } template - void write_message(Parent& parent, - std::unique_ptr msg) { + void + write_message(Parent& parent, std::unique_ptr msg) { auto writer = make_packet_writer_decorator(*this, parent); if (auto err = application_.write_message(writer, std::move(msg))) CAF_LOG_ERROR("write_message failed: " << err); diff --git a/libcaf_net/caf/net/transport_worker_dispatcher.hpp b/libcaf_net/caf/net/transport_worker_dispatcher.hpp index 6dab4e16..ea9b7dbd 100644 --- a/libcaf_net/caf/net/transport_worker_dispatcher.hpp +++ b/libcaf_net/caf/net/transport_worker_dispatcher.hpp @@ -21,7 +21,7 @@ #include #include "caf/logger.hpp" -#include "caf/net/endpoint_manager_queue.hpp" +#include "caf/net/consumer_queue.hpp" #include "caf/net/fwd.hpp" #include "caf/net/packet_writer_decorator.hpp" #include "caf/net/transport_worker.hpp" @@ -74,8 +74,8 @@ class transport_worker_dispatcher { } template - void write_message(Parent& parent, - std::unique_ptr msg) { + void + write_message(Parent& parent, std::unique_ptr msg) { auto receiver = msg->receiver; if (!receiver) return; @@ -132,8 +132,8 @@ class transport_worker_dispatcher { } template - expected add_new_worker(Parent& parent, node_id node, - id_type id) { + expected + add_new_worker(Parent& parent, node_id node, id_type id) { CAF_LOG_TRACE(CAF_ARG(node) << CAF_ARG(id)); auto application = factory_.make(); auto worker = std::make_shared(std::move(application), id); diff --git a/libcaf_net/src/actor_proxy_impl.cpp b/libcaf_net/src/actor_proxy_impl.cpp index a8a325d3..b934fcf0 100644 --- a/libcaf_net/src/actor_proxy_impl.cpp +++ b/libcaf_net/src/actor_proxy_impl.cpp @@ -21,13 +21,15 @@ #include "caf/actor_system.hpp" #include "caf/expected.hpp" #include "caf/logger.hpp" +#include "caf/net/basp/application.hpp" namespace caf::net { -actor_proxy_impl::actor_proxy_impl(actor_config& cfg, endpoint_manager_ptr dst) - : super(cfg), dst_(std::move(dst)) { - CAF_ASSERT(dst_ != nullptr); - dst_->enqueue_event(node(), id()); +actor_proxy_impl::actor_proxy_impl(actor_config& cfg, + caf::net::basp::application* app) + : super(cfg), app_(app) { + CAF_ASSERT(app != nullptr); + app_->enqueue_event(id()); } actor_proxy_impl::~actor_proxy_impl() { @@ -38,7 +40,7 @@ void actor_proxy_impl::enqueue(mailbox_element_ptr msg, execution_unit*) { CAF_PUSH_AID(0); CAF_ASSERT(msg != nullptr); CAF_LOG_SEND_EVENT(msg); - dst_->enqueue(std::move(msg), ctrl()); + app_->enqueue(std::move(msg), ctrl()); } void actor_proxy_impl::kill_proxy(execution_unit* ctx, error rsn) { diff --git a/libcaf_net/src/basp/application.cpp b/libcaf_net/src/basp/application.cpp index 1a44dacf..1ecb8714 100644 --- a/libcaf_net/src/basp/application.cpp +++ b/libcaf_net/src/basp/application.cpp @@ -20,103 +20,34 @@ #include -#include "caf/actor_system.hpp" -#include "caf/actor_system_config.hpp" -#include "caf/binary_deserializer.hpp" -#include "caf/binary_serializer.hpp" -#include "caf/byte_buffer.hpp" #include "caf/defaults.hpp" -#include "caf/detail/network_order.hpp" #include "caf/detail/parse.hpp" -#include "caf/error.hpp" #include "caf/logger.hpp" -#include "caf/net/basp/constants.hpp" -#include "caf/net/basp/ec.hpp" -#include "caf/net/packet_writer.hpp" #include "caf/no_stages.hpp" -#include "caf/none.hpp" -#include "caf/sec.hpp" -#include "caf/send.hpp" #include "caf/string_algorithms.hpp" namespace caf::net::basp { application::application(proxy_registry& proxies) - : proxies_(proxies), queue_{new message_queue}, hub_{new hub_type} { - // nop + : mailbox_(unit, unit, unit), + proxies_(proxies), + queue_{new message_queue}, + hub_{new hub_type} { + mailbox_.try_block(); } -error application::write_message( - packet_writer& writer, std::unique_ptr ptr) { - CAF_ASSERT(ptr != nullptr); - CAF_ASSERT(ptr->msg != nullptr); - CAF_LOG_TRACE(CAF_ARG2("content", ptr->msg->content())); - const auto& src = ptr->msg->sender; - const auto& dst = ptr->receiver; - if (dst == nullptr) { - // TODO: valid? - return none; - } - auto payload_buf = writer.next_payload_buffer(); - binary_serializer sink{system(), payload_buf}; - if (src != nullptr) { - auto src_id = src->id(); - system().registry().put(src_id, src); - if (!sink.apply_objects(src->node(), src_id, dst->id(), ptr->msg->stages)) - return sink.get_error(); - } else { - if (!sink.apply_objects(node_id{}, actor_id{0}, dst->id(), - ptr->msg->stages)) - return sink.get_error(); - } - if (!sink.apply_objects(ptr->msg->content())) - return sink.get_error(); - auto hdr = writer.next_header_buffer(); - to_bytes(header{message_type::actor_message, - static_cast(payload_buf.size()), - ptr->msg->mid.integer_value()}, - hdr); - writer.write_packet(hdr, payload_buf); - return none; -} - -void application::resolve(packet_writer& writer, string_view path, - const actor& listener) { +void application::resolve(string_view path, const actor& listener) { CAF_LOG_TRACE(CAF_ARG(path) << CAF_ARG(listener)); - auto payload = writer.next_payload_buffer(); - binary_serializer sink{&executor_, payload}; - if (!sink.apply_objects(path)) { - CAF_LOG_ERROR("unable to serialize path:" << sink.get_error()); - return; - } - auto req_id = next_request_id_++; - auto hdr = writer.next_header_buffer(); - to_bytes(header{message_type::resolve_request, - static_cast(payload.size()), req_id}, - hdr); - writer.write_packet(hdr, payload); - pending_resolves_.emplace(req_id, listener); + enqueue_event(to_string(path), listener); } -void application::new_proxy(packet_writer& writer, actor_id id) { - auto hdr = writer.next_header_buffer(); - to_bytes(header{message_type::monitor_message, 0, static_cast(id)}, - hdr); - writer.write_packet(hdr); -} - -void application::local_actor_down(packet_writer& writer, actor_id id, - error reason) { - auto payload = writer.next_payload_buffer(); - binary_serializer sink{system(), payload}; - if (!sink.apply_objects(reason)) - CAF_RAISE_ERROR("unable to serialize an error"); - auto hdr = writer.next_header_buffer(); - to_bytes(header{message_type::down_message, - static_cast(payload.size()), - static_cast(id)}, - hdr); - writer.write_packet(hdr, payload); +strong_actor_ptr application::make_proxy(const node_id& nid, + const actor_id& aid) { + CAF_LOG_TRACE(CAF_ARG(nid) << CAF_ARG(aid)); + using impl_type = actor_proxy_impl; + using handle_type = strong_actor_ptr; + actor_config cfg; + return make_actor(aid, nid, system_, cfg, this); } strong_actor_ptr application::resolve_local_path(string_view path) { @@ -141,252 +72,27 @@ strong_actor_ptr application::resolve_local_path(string_view path) { return nullptr; } -error application::handle(size_t& next_read_size, packet_writer& writer, - byte_span bytes) { - CAF_LOG_TRACE(CAF_ARG(state_) << CAF_ARG2("bytes.size", bytes.size())); - switch (state_) { - case connection_state::await_handshake_header: { - if (bytes.size() != header_size) - return ec::unexpected_number_of_bytes; - hdr_ = header::from_bytes(bytes); - if (hdr_.type != message_type::handshake) - return ec::missing_handshake; - if (hdr_.operation_data != version) - return ec::version_mismatch; - if (hdr_.payload_len == 0) - return ec::missing_payload; - state_ = connection_state::await_handshake_payload; - next_read_size = hdr_.payload_len; - return none; - } - case connection_state::await_handshake_payload: { - if (auto err = handle_handshake(writer, hdr_, bytes)) - return err; - state_ = connection_state::await_header; - return none; - } - case connection_state::await_header: { - if (bytes.size() != header_size) - return ec::unexpected_number_of_bytes; - hdr_ = header::from_bytes(bytes); - if (hdr_.payload_len == 0) - return handle(writer, hdr_, byte_span{}); - next_read_size = hdr_.payload_len; - state_ = connection_state::await_payload; - return none; - } - case connection_state::await_payload: { - if (bytes.size() != hdr_.payload_len) - return ec::unexpected_number_of_bytes; - state_ = connection_state::await_header; - return handle(writer, hdr_, bytes); - } - default: - return ec::illegal_state; - } +void application::enqueue(mailbox_element_ptr msg, strong_actor_ptr receiver) { + CAF_LOG_TRACE(CAF_ARG(msg) << CAF_ARG(receiver)); + using message_type = consumer_queue::message; + auto ptr = new message_type(std::move(msg), std::move(receiver)); + enqueue(ptr); } -error application::handle(packet_writer& writer, header hdr, - byte_span payload) { - CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size())); - switch (hdr.type) { - case message_type::handshake: - return ec::unexpected_handshake; - case message_type::actor_message: - return handle_actor_message(writer, hdr, payload); - case message_type::resolve_request: - return handle_resolve_request(writer, hdr, payload); - case message_type::resolve_response: - return handle_resolve_response(writer, hdr, payload); - case message_type::monitor_message: - return handle_monitor_message(writer, hdr, payload); - case message_type::down_message: - return handle_down_message(writer, hdr, payload); - case message_type::heartbeat: - return none; +bool application::enqueue(consumer_queue::element* ptr) { + CAF_LOG_TRACE(""); + switch (mailbox_.push_back(ptr)) { + case intrusive::inbox_result::success: + return true; + case intrusive::inbox_result::unblocked_reader: { + std::unique_lock guard{owner_mtx_}; + if (owner_) + owner_->mpx().register_writing(owner_); + return true; + } default: - return ec::unimplemented; - } -} - -error application::handle_handshake(packet_writer&, header hdr, - byte_span payload) { - CAF_LOG_TRACE(CAF_ARG(hdr) << CAF_ARG2("payload.size", payload.size())); - if (hdr.type != message_type::handshake) - return ec::missing_handshake; - if (hdr.operation_data != version) - return ec::version_mismatch; - node_id peer_id; - std::vector app_ids; - binary_deserializer source{&executor_, payload}; - if (!source.apply_objects(peer_id, app_ids)) - return source.get_error(); - if (!peer_id || app_ids.empty()) - return ec::invalid_handshake; - auto ids = get_or(system().config(), "caf.middleman.app-identifiers", - basp::application::default_app_ids()); - auto predicate = [=](const std::string& x) { - return std::find(ids.begin(), ids.end(), x) != ids.end(); - }; - if (std::none_of(app_ids.begin(), app_ids.end(), predicate)) - return ec::app_identifiers_mismatch; - peer_id_ = std::move(peer_id); - state_ = connection_state::await_header; - return none; -} - -error application::handle_actor_message(packet_writer&, header hdr, - byte_span payload) { - auto worker = hub_->pop(); - if (worker != nullptr) { - CAF_LOG_DEBUG("launch BASP worker for deserializing an actor_message"); - worker->launch(node_id{}, hdr, payload); - } else { - CAF_LOG_DEBUG( - "out of BASP workers, continue deserializing an actor_message"); - // If no worker is available then we have no other choice than to take - // the performance hit and deserialize in this thread. - struct handler : remote_message_handler { - handler(message_queue* queue, proxy_registry* proxies, - actor_system* system, node_id last_hop, basp::header& hdr, - byte_span payload) - : queue_(queue), - proxies_(proxies), - system_(system), - last_hop_(std::move(last_hop)), - hdr_(hdr), - payload_(payload) { - msg_id_ = queue_->new_id(); - } - message_queue* queue_; - proxy_registry* proxies_; - actor_system* system_; - node_id last_hop_; - basp::header& hdr_; - byte_span payload_; - uint64_t msg_id_; - }; - handler f{queue_.get(), &proxies_, system_, node_id{}, hdr, payload}; - f.handle_remote_message(&executor_); - } - return none; -} - -error application::handle_resolve_request(packet_writer& writer, header rec_hdr, - byte_span received) { - CAF_LOG_TRACE(CAF_ARG(rec_hdr) << CAF_ARG2("received.size", received.size())); - CAF_ASSERT(rec_hdr.type == message_type::resolve_request); - size_t path_size = 0; - binary_deserializer source{&executor_, received}; - if (!source.begin_sequence(path_size)) - return source.get_error(); - // We expect the received buffer to contain the path only. - if (path_size != source.remaining()) - return ec::invalid_payload; - auto remainder = source.remainder(); - string_view path{reinterpret_cast(remainder.data()), - remainder.size()}; - // Write result. - auto result = resolve_local_path(path); - actor_id aid; - std::set ifs; - if (result) { - aid = result->id(); - system().registry().put(aid, result); - } else { - aid = 0; - } - // TODO: figure out how to obtain messaging interface. - auto payload = writer.next_payload_buffer(); - binary_serializer sink{&executor_, payload}; - if (!sink.apply_objects(aid, ifs)) - return sink.get_error(); - auto hdr = writer.next_header_buffer(); - to_bytes(header{message_type::resolve_response, - static_cast(payload.size()), - rec_hdr.operation_data}, - hdr); - writer.write_packet(hdr, payload); - return none; -} - -error application::handle_resolve_response(packet_writer&, header received_hdr, - byte_span received) { - CAF_LOG_TRACE(CAF_ARG(received_hdr) - << CAF_ARG2("received.size", received.size())); - CAF_ASSERT(received_hdr.type == message_type::resolve_response); - auto i = pending_resolves_.find(received_hdr.operation_data); - if (i == pending_resolves_.end()) { - CAF_LOG_ERROR("received unknown ID in resolve_response message"); - return none; - } - auto guard = detail::make_scope_guard([&] { pending_resolves_.erase(i); }); - actor_id aid; - std::set ifs; - binary_deserializer source{&executor_, received}; - if (!source.apply_objects(aid, ifs)) { - anon_send(i->second, sec::remote_lookup_failed); - return source.get_error(); + return false; } - if (aid == 0) { - anon_send(i->second, strong_actor_ptr{nullptr}, std::move(ifs)); - return none; - } - anon_send(i->second, proxies_.get_or_put(peer_id_, aid), std::move(ifs)); - return none; -} - -error application::handle_monitor_message(packet_writer& writer, - header received_hdr, - byte_span received) { - CAF_LOG_TRACE(CAF_ARG(received_hdr) - << CAF_ARG2("received.size", received.size())); - if (!received.empty()) - return ec::unexpected_payload; - auto aid = static_cast(received_hdr.operation_data); - auto hdl = system().registry().get(aid); - if (hdl != nullptr) { - endpoint_manager_ptr mgr = manager_; - auto nid = peer_id_; - hdl->get()->attach_functor([mgr, nid, aid](error reason) mutable { - mgr->enqueue_event(std::move(nid), aid, std::move(reason)); - }); - } else { - error reason = exit_reason::unknown; - auto payload = writer.next_payload_buffer(); - binary_serializer sink{&executor_, payload}; - if (!sink.apply_objects(reason)) - return sink.get_error(); - auto hdr = writer.next_header_buffer(); - to_bytes(header{message_type::down_message, - static_cast(payload.size()), - received_hdr.operation_data}, - hdr); - writer.write_packet(hdr, payload); - } - return none; -} - -error application::handle_down_message(packet_writer&, header received_hdr, - byte_span received) { - CAF_LOG_TRACE(CAF_ARG(received_hdr) - << CAF_ARG2("received.size", received.size())); - error reason; - binary_deserializer source{&executor_, received}; - if (!source.apply_objects(reason)) - return source.get_error(); - proxies_.erase(peer_id_, received_hdr.operation_data, std::move(reason)); - return none; -} - -error application::generate_handshake(byte_buffer& buf) { - binary_serializer sink{&executor_, buf}; - if (!sink.apply_objects(system().node(), - get_or(system().config(), - "caf.middleman.app-identifiers", - application::default_app_ids()))) - return sink.get_error(); - return none; } } // namespace caf::net::basp diff --git a/libcaf_net/src/basp/connection_state_strings.cpp b/libcaf_net/src/basp/connection_state_strings.cpp deleted file mode 100644 index 0a3f2e15..00000000 --- a/libcaf_net/src/basp/connection_state_strings.cpp +++ /dev/null @@ -1,31 +0,0 @@ -// clang-format off -// DO NOT EDIT: this file is auto-generated by caf-generate-enum-strings. -// Run the target update-enum-strings if this file is out of sync. -#include "caf/net/basp/connection_state.hpp" - -#include - -namespace caf { -namespace net { -namespace basp { - -std::string to_string(connection_state x) { - switch(x) { - default: - return "???"; - case connection_state::await_handshake_header: - return "await_handshake_header"; - case connection_state::await_handshake_payload: - return "await_handshake_payload"; - case connection_state::await_header: - return "await_header"; - case connection_state::await_payload: - return "await_payload"; - case connection_state::shutdown: - return "shutdown"; - }; -} - -} // namespace basp -} // namespace net -} // namespace caf diff --git a/libcaf_net/src/endpoint_manager.cpp b/libcaf_net/src/endpoint_manager.cpp deleted file mode 100644 index b89ec0f8..00000000 --- a/libcaf_net/src/endpoint_manager.cpp +++ /dev/null @@ -1,101 +0,0 @@ -/****************************************************************************** - * ____ _ _____ * - * / ___| / \ | ___| C++ * - * | | / _ \ | |_ Actor * - * | |___ / ___ \| _| Framework * - * \____/_/ \_|_| * - * * - * Copyright 2011-2019 Dominik Charousset * - * * - * Distributed under the terms and conditions of the BSD 3-Clause License or * - * (at your option) under the terms and conditions of the Boost Software * - * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * - * * - * If you did not receive a copy of the license files, see * - * http://opensource.org/licenses/BSD-3-Clause and * - * http://www.boost.org/LICENSE_1_0.txt. * - ******************************************************************************/ - -#include "caf/net/endpoint_manager.hpp" - -#include "caf/intrusive/inbox_result.hpp" -#include "caf/net/multiplexer.hpp" -#include "caf/sec.hpp" -#include "caf/send.hpp" - -namespace caf::net { - -// -- constructors, destructors, and assignment operators ---------------------- - -endpoint_manager::endpoint_manager(socket handle, const multiplexer_ptr& parent, - actor_system& sys) - : super(handle, parent), sys_(sys), queue_(unit, unit, unit) { - queue_.try_block(); -} - -endpoint_manager::~endpoint_manager() { - // nop -} - -// -- properties --------------------------------------------------------------- - -const actor_system_config& endpoint_manager::config() const noexcept { - return sys_.config(); -} - -// -- queue access ------------------------------------------------------------- - -bool endpoint_manager::at_end_of_message_queue() { - return queue_.empty() && queue_.try_block(); -} - -endpoint_manager_queue::message_ptr endpoint_manager::next_message() { - if (queue_.blocked()) - return nullptr; - queue_.fetch_more(); - auto& q = std::get<1>(queue_.queue().queues()); - auto ts = q.next_task_size(); - if (ts == 0) - return nullptr; - q.inc_deficit(ts); - auto result = q.next(); - if (queue_.empty()) - queue_.try_block(); - return result; -} - -// -- event management --------------------------------------------------------- - -void endpoint_manager::resolve(uri locator, actor listener) { - using intrusive::inbox_result; - using event_type = endpoint_manager_queue::event; - auto ptr = new event_type(std::move(locator), listener); - if (!enqueue(ptr)) - anon_send(listener, resolve_atom_v, make_error(sec::request_receiver_down)); -} - -void endpoint_manager::enqueue(mailbox_element_ptr msg, - strong_actor_ptr receiver) { - using message_type = endpoint_manager_queue::message; - auto ptr = new message_type(std::move(msg), std::move(receiver)); - enqueue(ptr); -} - -bool endpoint_manager::enqueue(endpoint_manager_queue::element* ptr) { - switch (queue_.push_back(ptr)) { - case intrusive::inbox_result::success: - return true; - case intrusive::inbox_result::unblocked_reader: { - auto mpx = parent_.lock(); - if (mpx) { - mpx->register_writing(this); - return true; - } - return false; - } - default: - return false; - } -} - -} // namespace caf::net diff --git a/libcaf_net/src/header.cpp b/libcaf_net/src/header.cpp index c17885eb..1e63e10e 100644 --- a/libcaf_net/src/header.cpp +++ b/libcaf_net/src/header.cpp @@ -31,10 +31,8 @@ namespace { void to_bytes_impl(const header& x, byte* ptr) { *ptr = static_cast(x.type); - auto payload_len = detail::to_network_order(x.payload_len); - memcpy(ptr + 1, &payload_len, sizeof(payload_len)); auto operation_data = detail::to_network_order(x.operation_data); - memcpy(ptr + 5, &operation_data, sizeof(operation_data)); + memcpy(ptr + 1, &operation_data, sizeof(operation_data)); } } // namespace @@ -50,11 +48,8 @@ header header::from_bytes(span bytes) { header result; auto ptr = bytes.data(); result.type = *reinterpret_cast(ptr); - uint32_t payload_len = 0; - memcpy(&payload_len, ptr + 1, 4); - result.payload_len = detail::from_network_order(payload_len); uint64_t operation_data; - memcpy(&operation_data, ptr + 5, 8); + memcpy(&operation_data, ptr + 1, 8); result.operation_data = detail::from_network_order(operation_data); return result; } diff --git a/libcaf_net/src/multiplexer.cpp b/libcaf_net/src/multiplexer.cpp index 52ab912c..b80bdb40 100644 --- a/libcaf_net/src/multiplexer.cpp +++ b/libcaf_net/src/multiplexer.cpp @@ -23,7 +23,6 @@ #include "caf/byte.hpp" #include "caf/config.hpp" #include "caf/error.hpp" -#include "caf/expected.hpp" #include "caf/logger.hpp" #include "caf/make_counted.hpp" #include "caf/net/middleman.hpp" @@ -31,8 +30,6 @@ #include "caf/net/pollset_updater.hpp" #include "caf/net/socket_manager.hpp" #include "caf/sec.hpp" -#include "caf/span.hpp" -#include "caf/variant.hpp" #ifndef CAF_WINDOWS # include diff --git a/libcaf_net/src/net/backend/test.cpp b/libcaf_net/src/net/backend/test.cpp index b77eb9a2..65b89bf9 100644 --- a/libcaf_net/src/net/backend/test.cpp +++ b/libcaf_net/src/net/backend/test.cpp @@ -22,9 +22,10 @@ #include "caf/net/actor_proxy_impl.hpp" #include "caf/net/basp/application.hpp" #include "caf/net/basp/ec.hpp" -#include "caf/net/make_endpoint_manager.hpp" +#include "caf/net/length_prefix_framing.hpp" #include "caf/net/middleman.hpp" #include "caf/net/multiplexer.hpp" +#include "caf/net/socket_manager.hpp" #include "caf/net/stream_transport.hpp" #include "caf/raise_error.hpp" #include "caf/sec.hpp" @@ -51,31 +52,30 @@ void test::stop() { peers_.clear(); } -endpoint_manager_ptr test::peer(const node_id& id) { - return get_peer(id).second; +socket_manager_ptr test::peer(const node_id& id) { + return std::get(get_peer(id)); } -expected test::get_or_connect(const uri& locator) { +expected test::get_or_connect(const uri& locator) { if (auto ptr = peer(make_node_id(*locator.authority_only()))) return ptr; - return make_error(sec::runtime_error, - "connecting not implemented in test backend"); + return make_error( + sec::runtime_error, + "connecting not implemented in test backend. `emplace` first."); } void test::resolve(const uri& locator, const actor& listener) { - auto id = locator.authority_only(); - if (id) - peer(make_node_id(*id))->resolve(locator, listener); - else + if (auto id = locator.authority_only()) { + auto basp_ptr = std::get(get_peer(make_node_id(*id))); + basp_ptr->resolve(locator.path(), listener); + } else { anon_send(listener, error(basp::ec::invalid_locator)); + } } strong_actor_ptr test::make_proxy(node_id nid, actor_id aid) { - using impl_type = actor_proxy_impl; - using hdl_type = strong_actor_ptr; - actor_config cfg; - return make_actor(aid, nid, &mm_.system(), cfg, - peer(nid)); + auto basp_ptr = std::get(get_peer(nid)); + return basp_ptr->make_proxy(nid, aid); } void test::set_last_hop(node_id*) { @@ -88,33 +88,30 @@ uint16_t test::port() const noexcept { test::peer_entry& test::emplace(const node_id& peer_id, stream_socket first, stream_socket second) { - using transport_type = stream_transport; if (auto err = nonblocking(second, true)) CAF_LOG_ERROR("nonblocking failed: " << err); - auto mpx = mm_.mpx(); - basp::application app{proxies_}; - auto mgr = make_endpoint_manager(mpx, mm_.system(), - transport_type{second, std::move(app)}); - if (auto err = mgr->init()) { + auto& mpx = mm_.mpx(); + auto mgr = make_socket_manager(second, &mpx, proxies_); + settings cfg; + if (auto err = mgr->init(cfg)) { CAF_LOG_ERROR("mgr->init() failed: " << err); CAF_RAISE_ERROR("mgr->init() failed"); } - mpx->register_reading(mgr); + auto basp_ptr = std::addressof(mgr->top_layer()); + const std::lock_guard lock(lock_); auto& result = peers_[peer_id]; - result = std::make_pair(first, std::move(mgr)); + result = std::make_tuple(first, std::move(mgr), basp_ptr); return result; } test::peer_entry& test::get_peer(const node_id& id) { + const std::lock_guard lock(lock_); auto i = peers_.find(id); if (i != peers_.end()) return i->second; - auto sockets = make_stream_socket_pair(); - if (!sockets) { - CAF_LOG_ERROR("make_stream_socket_pair failed: " << sockets.error()); - CAF_RAISE_ERROR("make_stream_socket_pair failed"); - } - return emplace(id, sockets->first, sockets->second); + CAF_LOG_ERROR(make_error(sec::runtime_error, "peer_entry not found")); + CAF_RAISE_ERROR("peer_entry not found"); } } // namespace caf::net::backend diff --git a/libcaf_net/src/net/endpoint_manager_queue.cpp b/libcaf_net/src/net/consumer_queue.cpp similarity index 64% rename from libcaf_net/src/net/endpoint_manager_queue.cpp rename to libcaf_net/src/net/consumer_queue.cpp index 6c96bbac..bd27c9f7 100644 --- a/libcaf_net/src/net/endpoint_manager_queue.cpp +++ b/libcaf_net/src/net/consumer_queue.cpp @@ -16,58 +16,57 @@ * http://www.boost.org/LICENSE_1_0.txt. * ******************************************************************************/ -#include "caf/net/endpoint_manager_queue.hpp" +#include "caf/net/consumer_queue.hpp" namespace caf::net { -endpoint_manager_queue::element::~element() { +consumer_queue::element::~element() { // nop } -endpoint_manager_queue::event::event(uri locator, actor listener) +consumer_queue::event::event(std::string path, actor listener) : element(element_type::event), - value(resolve_request{std::move(locator), std::move(listener)}) { + value(resolve_request{std::move(path), std::move(listener)}) { // nop } -endpoint_manager_queue::event::event(node_id peer, actor_id proxy_id) - : element(element_type::event), value(new_proxy{peer, proxy_id}) { +consumer_queue::event::event(actor_id proxy_id) + : element(element_type::event), value(new_proxy{proxy_id}) { // nop } -endpoint_manager_queue::event::event(node_id observing_peer, - actor_id local_actor_id, error reason) +consumer_queue::event::event(actor_id local_actor_id, error reason) : element(element_type::event), - value(local_actor_down{observing_peer, local_actor_id, std::move(reason)}) { + value(local_actor_down{local_actor_id, std::move(reason)}) { // nop } -endpoint_manager_queue::event::event(std::string tag, uint64_t id) +consumer_queue::event::event(std::string tag, uint64_t id) : element(element_type::event), value(timeout{std::move(tag), id}) { // nop } -endpoint_manager_queue::event::~event() { +consumer_queue::event::~event() { // nop } -size_t endpoint_manager_queue::event::task_size() const noexcept { +size_t consumer_queue::event::task_size() const noexcept { return 1; } -endpoint_manager_queue::message::message(mailbox_element_ptr msg, - strong_actor_ptr receiver) +consumer_queue::message::message(mailbox_element_ptr msg, + strong_actor_ptr receiver) : element(element_type::message), msg(std::move(msg)), receiver(std::move(receiver)) { // nop } -size_t endpoint_manager_queue::message::task_size() const noexcept { +size_t consumer_queue::message::task_size() const noexcept { return message_policy::task_size(*this); } -endpoint_manager_queue::message::~message() { +consumer_queue::message::~message() { // nop } diff --git a/libcaf_net/src/net/middleman.cpp b/libcaf_net/src/net/middleman.cpp index f2854fa5..aedac3fd 100644 --- a/libcaf_net/src/net/middleman.cpp +++ b/libcaf_net/src/net/middleman.cpp @@ -23,8 +23,8 @@ #include "caf/expected.hpp" #include "caf/init_global_meta_objects.hpp" #include "caf/net/basp/ec.hpp" -#include "caf/net/endpoint_manager.hpp" #include "caf/net/middleman_backend.hpp" +#include "caf/net/socket_manager.hpp" #include "caf/raise_error.hpp" #include "caf/sec.hpp" #include "caf/send.hpp" @@ -106,7 +106,7 @@ void middleman::add_module_options(actor_system_config& cfg) { .add("network-backend", "legacy option"); } -expected middleman::connect(const uri& locator) { +expected middleman::connect(const uri& locator) { if (auto ptr = backend(locator.scheme())) return ptr->get_or_connect(locator); else diff --git a/libcaf_net/test/accept_socket.cpp b/libcaf_net/test/accept_socket.cpp index ba987ddc..ac7f4113 100644 --- a/libcaf_net/test/accept_socket.cpp +++ b/libcaf_net/test/accept_socket.cpp @@ -21,9 +21,7 @@ #include "caf/net/tcp_accept_socket.hpp" #include "caf/binary_serializer.hpp" -#include "caf/net/endpoint_manager.hpp" #include "caf/net/ip.hpp" -#include "caf/net/make_endpoint_manager.hpp" #include "caf/net/multiplexer.hpp" #include "caf/net/socket_guard.hpp" #include "caf/net/tcp_stream_socket.hpp" diff --git a/libcaf_net/test/application.cpp b/libcaf_net/test/application.cpp index dbcd5b5c..572b0095 100644 --- a/libcaf_net/test/application.cpp +++ b/libcaf_net/test/application.cpp @@ -26,35 +26,59 @@ #include "caf/byte_buffer.hpp" #include "caf/forwarding_actor_proxy.hpp" -#include "caf/net/basp/connection_state.hpp" #include "caf/net/basp/constants.hpp" #include "caf/net/basp/ec.hpp" #include "caf/net/middleman.hpp" -#include "caf/net/packet_writer.hpp" -#include "caf/none.hpp" #include "caf/uri.hpp" +#include "caf/net/length_prefix_framing.hpp" + using namespace caf; using namespace caf::net; #define REQUIRE_OK(statement) \ - if (auto err = statement) \ - CAF_FAIL("failed to serialize data: " << err); + do { \ + if (auto err = statement) \ + CAF_FAIL("failed to serialize data: " << err); \ + } while (false) namespace { +struct dummy_socket_manager : public socket_manager { + dummy_socket_manager(socket handle, multiplexer* mpx) + : socket_manager(handle, mpx) { + // nop + } + + error init(const settings&) override { + return none; + } + + bool handle_read_event() override { + return false; + } + + bool handle_write_event() override { + return false; + } + + void handle_error(sec) override { + // nop + } +}; + struct config : actor_system_config { config() { net::middleman::add_module_options(*this); } }; -struct fixture : test_coordinator_fixture, - proxy_registry::backend, - basp::application::test_tag, - public packet_writer { - fixture() : proxies(sys, *this), app(proxies) { - REQUIRE_OK(app.init(*this)); +struct fixture : test_coordinator_fixture, proxy_registry::backend { + fixture() : mm(sys), mpx(&mm), proxies(sys, *this), app(proxies) { + dummy_socket_manager dummy_mgr{socket{42}, &mpx}; + settings cfg; + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); + REQUIRE_OK(app.init(&dummy_mgr, this_layer_ptr, cfg)); uri mars_uri; REQUIRE_OK(parse("tcp://mars", mars_uri)); mars = make_node_id(mars_uri); @@ -63,8 +87,8 @@ struct fixture : test_coordinator_fixture, template byte_buffer to_buf(const Ts&... xs) { byte_buffer buf; - binary_serializer sink{system(), buf}; - REQUIRE_OK(sink(xs...)); + binary_serializer sink{sys, buf}; + REQUIRE_OK(!sink.apply_objects(xs...)); return buf; } @@ -74,54 +98,52 @@ struct fixture : test_coordinator_fixture, } void handle_handshake() { - CAF_CHECK_EQUAL(app.state(), - basp::connection_state::await_handshake_header); - auto payload = to_buf(mars, basp::application::default_app_ids()); - set_input(basp::header{basp::message_type::handshake, - static_cast(payload.size()), - basp::version}); - REQUIRE_OK(app.handle_data(*this, input)); - CAF_CHECK_EQUAL(app.state(), - basp::connection_state::await_handshake_payload); - REQUIRE_OK(app.handle_data(*this, payload)); + CAF_CHECK(!app.handshake_complete()); + auto app_ids = basp::application::default_app_ids(); + set_input(basp::header{basp::message_type::handshake, basp::version}, mars, + app_ids); + CAF_MESSAGE("set_input done"); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); + CAF_REQUIRE_GREATER_OR_EQUAL(app.consume(this_layer_ptr, input), 0); + CAF_CHECK(app.handshake_complete()); } void consume_handshake() { if (output.size() < basp::header_size) CAF_FAIL("BASP application did not write a handshake header"); auto hdr = basp::header::from_bytes(output); - if (hdr.type != basp::message_type::handshake || hdr.payload_len == 0 + if (hdr.type != basp::message_type::handshake || hdr.operation_data != basp::version) CAF_FAIL("invalid handshake header"); node_id nid; std::vector app_ids; binary_deserializer source{sys, output}; source.skip(basp::header_size); - if (auto err = source(nid, app_ids)) - CAF_FAIL("unable to deserialize payload: " << err); + if (!source.apply_objects(nid, app_ids)) + CAF_FAIL("unable to deserialize payload: " << source.get_error()); if (source.remaining() > 0) CAF_FAIL("trailing bytes after reading payload"); output.clear(); } - actor_system& system() { - return sys; - } - - fixture& transport() { - return *this; + template + bool can_send_more(LowerLayerPtr&) { + return true; } - endpoint_manager& manager() { - CAF_FAIL("unexpected function call"); + template + void begin_message(LowerLayerPtr&) { + // nop } - byte_buffer next_payload_buffer() override { - return {}; + template + byte_buffer& message_buffer(LowerLayerPtr&) { + return output; } - byte_buffer next_header_buffer() override { - return {}; + template + void end_message(LowerLayerPtr&) { + // nop } template @@ -129,6 +151,16 @@ struct fixture : test_coordinator_fixture, // nop } + template + void timeout(LowerLayerPtr&, std::string, uint64_t) { + // nop + } + + template + void abort_reason(LowerLayerPtr&, const error& err) { + last_error = err; + } + strong_actor_ptr make_proxy(node_id nid, actor_id aid) override { using impl_type = forwarding_actor_proxy; using hdl_type = strong_actor_ptr; @@ -141,10 +173,9 @@ struct fixture : test_coordinator_fixture, } protected: - void write_impl(span buffers) override { - for (auto buf : buffers) - output.insert(output.end(), buf->begin(), buf->end()); - } + middleman mm; + + multiplexer mpx; byte_buffer input; @@ -155,26 +186,27 @@ struct fixture : test_coordinator_fixture, proxy_registry proxies; basp::application app; + + error last_error; }; } // namespace #define MOCK(kind, op, ...) \ do { \ - auto payload = to_buf(__VA_ARGS__); \ - set_input(basp::header{kind, static_cast(payload.size()), op}); \ - if (auto err = app.handle_data(*this, input)) \ - CAF_FAIL("application-under-test failed to process header: " << err); \ - if (auto err = app.handle_data(*this, payload)) \ - CAF_FAIL("application-under-test failed to process payload: " << err); \ + set_input(basp::header{kind, op}, __VA_ARGS__); \ + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); \ + if (app.consume(this_layer_ptr, input) < 0) \ + CAF_FAIL( \ + "application-under-test failed to process message: " << last_error); \ } while (false) #define RECEIVE(msg_type, op_data, ...) \ do { \ binary_deserializer source{sys, output}; \ basp::header hdr; \ - if (auto err = source(hdr, __VA_ARGS__)) \ - CAF_FAIL("failed to receive data: " << err); \ + if (!source.apply_objects(hdr, __VA_ARGS__)) \ + CAF_FAIL("failed to receive data: " << source.get_error()); \ if (source.remaining() != 0) \ CAF_FAIL("unable to read entire message, " << source.remaining() \ << " bytes left in buffer"); \ @@ -186,59 +218,53 @@ struct fixture : test_coordinator_fixture, CAF_TEST_FIXTURE_SCOPE(application_tests, fixture) CAF_TEST(missing handshake) { - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_handshake_header); - set_input(basp::header{basp::message_type::heartbeat, 0, 0}); - CAF_CHECK_EQUAL(app.handle_data(*this, input), basp::ec::missing_handshake); + CAF_CHECK(!app.handshake_complete()); + set_input(basp::header{basp::message_type::heartbeat, 0}); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); + CAF_CHECK_LESS(app.consume(this_layer_ptr, input), 0); + CAF_CHECK_EQUAL(last_error, basp::ec::missing_handshake); } CAF_TEST(version mismatch) { - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_handshake_header); - set_input(basp::header{basp::message_type::handshake, 0, 0}); - CAF_CHECK_EQUAL(app.handle_data(*this, input), basp::ec::version_mismatch); -} - -CAF_TEST(missing payload in handshake) { - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_handshake_header); - set_input(basp::header{basp::message_type::handshake, 0, basp::version}); - CAF_CHECK_EQUAL(app.handle_data(*this, input), basp::ec::missing_payload); + CAF_CHECK(!app.handshake_complete()); + set_input(basp::header{basp::message_type::handshake, 0}); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); + CAF_CHECK_LESS(app.consume(this_layer_ptr, input), 0); + CAF_CHECK_EQUAL(last_error, basp::ec::version_mismatch); } CAF_TEST(invalid handshake) { - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_handshake_header); + CAF_CHECK(!app.handshake_complete()); node_id no_nid; std::vector no_ids; - auto payload = to_buf(no_nid, no_ids); - set_input(basp::header{basp::message_type::handshake, - static_cast(payload.size()), basp::version}); - REQUIRE_OK(app.handle_data(*this, input)); - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_handshake_payload); - CAF_CHECK_EQUAL(app.handle_data(*this, payload), basp::ec::invalid_handshake); + set_input(basp::header{basp::message_type::handshake, basp::version}, no_nid, + no_ids); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); + CAF_CHECK_LESS(app.consume(this_layer_ptr, input), 0); + CAF_CHECK_EQUAL(last_error, basp::ec::invalid_handshake); } CAF_TEST(app identifier mismatch) { - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_handshake_header); + CAF_CHECK(!app.handshake_complete()); std::vector wrong_ids{"YOLO!!!"}; - auto payload = to_buf(mars, wrong_ids); - set_input(basp::header{basp::message_type::handshake, - static_cast(payload.size()), basp::version}); - REQUIRE_OK(app.handle_data(*this, input)); - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_handshake_payload); - CAF_CHECK_EQUAL(app.handle_data(*this, payload), - basp::ec::app_identifiers_mismatch); + set_input(basp::header{basp::message_type::handshake, basp::version}, mars, + wrong_ids); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); + CAF_CHECK_LESS(app.consume(this_layer_ptr, input), 0); + CAF_CHECK_EQUAL(last_error, basp::ec::app_identifiers_mismatch); } CAF_TEST(repeated handshake) { handle_handshake(); consume_handshake(); - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_header); + CAF_CHECK(app.handshake_complete()); node_id no_nid; std::vector no_ids; - auto payload = to_buf(no_nid, no_ids); - set_input(basp::header{basp::message_type::handshake, - static_cast(payload.size()), basp::version}); - CAF_CHECK_EQUAL(app.handle_data(*this, input), none); - CAF_CHECK_EQUAL(app.handle_data(*this, payload), - basp::ec::unexpected_handshake); + set_input(basp::header{basp::message_type::handshake, basp::version}, no_nid, + no_ids); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); + CAF_CHECK_LESS(app.consume(this_layer_ptr, input), 0); + CAF_CHECK_EQUAL(last_error, basp::ec::unexpected_handshake); } CAF_TEST(actor message) { @@ -256,9 +282,8 @@ CAF_TEST(actor message) { CAF_TEST(resolve request without result) { handle_handshake(); consume_handshake(); - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_header); + CAF_CHECK(app.handshake_complete()); MOCK(basp::message_type::resolve_request, 42, std::string{"foo/bar"}); - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_header); actor_id aid; std::set ifs; RECEIVE(basp::message_type::resolve_response, 42u, aid, ifs); @@ -271,9 +296,8 @@ CAF_TEST(resolve request on id with result) { consume_handshake(); sys.registry().put(self->id(), self); auto path = "id/" + std::to_string(self->id()); - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_header); + CAF_CHECK(app.handshake_complete()); MOCK(basp::message_type::resolve_request, 42, path); - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_header); actor_id aid; std::set ifs; RECEIVE(basp::message_type::resolve_response, 42u, aid, ifs); @@ -286,9 +310,8 @@ CAF_TEST(resolve request on name with result) { consume_handshake(); sys.registry().put("foo", self); std::string path = "name/foo"; - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_header); + CAF_CHECK(app.handshake_complete()); MOCK(basp::message_type::resolve_request, 42, path); - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_header); actor_id aid; std::set ifs; RECEIVE(basp::message_type::resolve_response, 42u, aid, ifs); @@ -299,7 +322,10 @@ CAF_TEST(resolve request on name with result) { CAF_TEST(resolve response with invalid actor handle) { handle_handshake(); consume_handshake(); - app.resolve(*this, "foo/bar", self); + CAF_CHECK(app.handshake_complete()); + app.resolve("foo/bar", self); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); + CAF_CHECK_GREATER_OR_EQUAL(app.prepare_send(this_layer_ptr), 0); std::string path; RECEIVE(basp::message_type::resolve_request, 1u, path); CAF_CHECK_EQUAL(path, "foo/bar"); @@ -315,7 +341,10 @@ CAF_TEST(resolve response with invalid actor handle) { CAF_TEST(resolve response with valid actor handle) { handle_handshake(); consume_handshake(); - app.resolve(*this, "foo/bar", self); + CAF_CHECK(app.handshake_complete()); + app.resolve("foo/bar", self); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); + CAF_CHECK_GREATER_OR_EQUAL(app.prepare_send(this_layer_ptr), 0); std::string path; RECEIVE(basp::message_type::resolve_request, 1u, path); CAF_CHECK_EQUAL(path, "foo/bar"); @@ -332,11 +361,12 @@ CAF_TEST(resolve response with valid actor handle) { CAF_TEST(heartbeat message) { handle_handshake(); consume_handshake(); - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_header); - auto bytes = to_bytes(basp::header{basp::message_type::heartbeat, 0, 0}); + CAF_CHECK(app.handshake_complete()); + auto bytes = to_bytes(basp::header{basp::message_type::heartbeat, 0}); set_input(bytes); - REQUIRE_OK(app.handle_data(*this, input)); - CAF_CHECK_EQUAL(app.state(), basp::connection_state::await_header); + auto this_layer_ptr = make_message_oriented_layer_ptr(this, this); + CAF_REQUIRE_GREATER(app.consume(this_layer_ptr, input), 0); + CAF_CHECK_EQUAL(last_error, none); } CAF_TEST_FIXTURE_SCOPE_END() diff --git a/libcaf_net/test/endpoint_manager.cpp b/libcaf_net/test/endpoint_manager.cpp deleted file mode 100644 index c42f5784..00000000 --- a/libcaf_net/test/endpoint_manager.cpp +++ /dev/null @@ -1,229 +0,0 @@ -/****************************************************************************** - * ____ _ _____ * - * / ___| / \ | ___| C++ * - * | | / _ \ | |_ Actor * - * | |___ / ___ \| _| Framework * - * \____/_/ \_|_| * - * * - * Copyright 2011-2019 Dominik Charousset * - * * - * Distributed under the terms and conditions of the BSD 3-Clause License or * - * (at your option) under the terms and conditions of the Boost Software * - * License 1.0. See accompanying files LICENSE and LICENSE_ALTERNATIVE. * - * * - * If you did not receive a copy of the license files, see * - * http://opensource.org/licenses/BSD-3-Clause and * - * http://www.boost.org/LICENSE_1_0.txt. * - ******************************************************************************/ - -#define CAF_SUITE endpoint_manager - -#include "caf/net/endpoint_manager.hpp" - -#include "caf/net/test/host_fixture.hpp" -#include "caf/test/dsl.hpp" - -#include "caf/binary_deserializer.hpp" -#include "caf/binary_serializer.hpp" -#include "caf/byte_buffer.hpp" -#include "caf/detail/scope_guard.hpp" -#include "caf/make_actor.hpp" -#include "caf/net/actor_proxy_impl.hpp" -#include "caf/net/make_endpoint_manager.hpp" -#include "caf/net/multiplexer.hpp" -#include "caf/net/stream_socket.hpp" -#include "caf/node_id.hpp" -#include "caf/span.hpp" - -using namespace caf; -using namespace caf::net; - -namespace { - -using byte_buffer_ptr = std::shared_ptr; - -string_view hello_manager{"hello manager!"}; - -string_view hello_test{"hello test!"}; - -struct fixture : test_coordinator_fixture<>, host_fixture { - fixture() { - mpx = std::make_shared(); - mpx->set_thread_id(); - if (auto err = mpx->init()) - CAF_FAIL("mpx->init failed: " << err); - if (mpx->num_socket_managers() != 1) - CAF_FAIL("mpx->num_socket_managers() != 1"); - } - - bool handle_io_event() override { - return mpx->poll_once(false); - } - - multiplexer_ptr mpx; -}; - -class dummy_application { - // nop -}; - -class dummy_transport { -public: - using application_type = dummy_application; - - dummy_transport(stream_socket handle, byte_buffer_ptr data) - : handle_(handle), data_(data), read_buf_(1024) { - // nop - } - - stream_socket handle() { - return handle_; - } - - template - error init(Manager& manager) { - auto test_bytes = as_bytes(make_span(hello_test)); - buf_.insert(buf_.end(), test_bytes.begin(), test_bytes.end()); - manager.register_writing(); - return none; - } - - template - bool handle_read_event(Manager&) { - auto num_bytes = read(handle_, read_buf_); - if (num_bytes > 0) { - data_->insert(data_->end(), read_buf_.begin(), - read_buf_.begin() + num_bytes); - return true; - } - return num_bytes < 0 && last_socket_error_is_temporary(); - } - - template - bool handle_write_event(Manager& mgr) { - for (auto x = mgr.next_message(); x != nullptr; x = mgr.next_message()) { - binary_serializer sink{mgr.system(), buf_}; - if (auto err = sink(x->msg->payload)) - CAF_FAIL("serializing failed: " << err); - } - auto num_bytes = write(handle_, buf_); - if (num_bytes > 0) { - buf_.erase(buf_.begin(), buf_.begin() + num_bytes); - return buf_.size() > 0; - } - return num_bytes < 0 && last_socket_error_is_temporary(); - } - - void handle_error(sec) { - // nop - } - - template - void resolve(Manager& mgr, const uri& locator, const actor& listener) { - actor_id aid = 42; - auto hid = string_view("0011223344556677889900112233445566778899"); - auto nid = unbox(make_node_id(42, hid)); - actor_config cfg; - auto p = make_actor( - aid, nid, &mgr.system(), cfg, &mgr); - std::string path{locator.path().begin(), locator.path().end()}; - anon_send(listener, resolve_atom_v, std::move(path), p); - } - - template - void timeout(Manager&, const std::string&, uint64_t) { - // nop - } - - template - void new_proxy(Parent&, const node_id&, actor_id) { - // nop - } - - template - void local_actor_down(Parent&, const node_id&, actor_id, error) { - // nop - } - -private: - stream_socket handle_; - - byte_buffer_ptr data_; - - byte_buffer read_buf_; - - byte_buffer buf_; -}; - -} // namespace - -CAF_TEST_FIXTURE_SCOPE(endpoint_manager_tests, fixture) - -CAF_TEST(send and receive) { - byte_buffer read_buf(1024); - auto buf = std::make_shared(); - auto sockets = unbox(make_stream_socket_pair()); - CAF_CHECK_EQUAL(nonblocking(sockets.second, true), none); - CAF_CHECK_LESS(read(sockets.second, read_buf), 0); - CAF_CHECK(last_socket_error_is_temporary()); - auto guard = detail::make_scope_guard([&] { close(sockets.second); }); - auto mgr = make_endpoint_manager(mpx, sys, - dummy_transport{sockets.first, buf}); - CAF_CHECK_EQUAL(mgr->mask(), operation::none); - CAF_CHECK_EQUAL(mgr->init(), none); - CAF_CHECK_EQUAL(mgr->mask(), operation::read_write); - CAF_CHECK_EQUAL(mpx->num_socket_managers(), 2u); - CAF_CHECK_EQUAL(write(sockets.second, as_bytes(make_span(hello_manager))), - hello_manager.size()); - run(); - CAF_CHECK_EQUAL(string_view(reinterpret_cast(buf->data()), - buf->size()), - hello_manager); - CAF_CHECK_EQUAL(read(sockets.second, read_buf), hello_test.size()); - CAF_CHECK_EQUAL(string_view(reinterpret_cast(read_buf.data()), - hello_test.size()), - hello_test); -} - -CAF_TEST(resolve and proxy communication) { - byte_buffer read_buf(1024); - auto buf = std::make_shared(); - auto sockets = unbox(make_stream_socket_pair()); - CAF_CHECK_EQUAL(nonblocking(sockets.second, true), none); - auto guard = detail::make_scope_guard([&] { close(sockets.second); }); - auto mgr = make_endpoint_manager(mpx, sys, - dummy_transport{sockets.first, buf}); - CAF_CHECK_EQUAL(mgr->init(), none); - CAF_CHECK_EQUAL(mgr->mask(), operation::read_write); - run(); - CAF_CHECK_EQUAL(read(sockets.second, read_buf), hello_test.size()); - mgr->resolve(unbox(make_uri("test:id/42")), self); - run(); - self->receive( - [&](resolve_atom, const std::string&, const strong_actor_ptr& p) { - CAF_MESSAGE("got a proxy, send a message to it"); - self->send(actor_cast(p), "hello proxy!"); - }, - after(std::chrono::seconds(0)) >> - [&] { CAF_FAIL("manager did not respond with a proxy."); }); - run(); - auto read_res = read(sockets.second, read_buf); - if (read_res <= 0) { - std::string msg = "socket closed"; - if (read_res < 0) - msg = last_socket_error_as_string(); - CAF_ERROR("read() failed: " << msg); - return; - } - read_buf.resize(static_cast(read_res)); - CAF_MESSAGE("receive buffer contains " << read_buf.size() << " bytes"); - message msg; - binary_deserializer source{sys, read_buf}; - CAF_CHECK_EQUAL(source(msg), none); - if (msg.match_elements()) - CAF_CHECK_EQUAL(msg.get_as(0), "hello proxy!"); - else - CAF_ERROR("expected a string, got: " << to_string(msg)); -} - -CAF_TEST_FIXTURE_SCOPE_END() diff --git a/libcaf_net/test/header.cpp b/libcaf_net/test/header.cpp index 1267d1d4..218934ce 100644 --- a/libcaf_net/test/header.cpp +++ b/libcaf_net/test/header.cpp @@ -31,7 +31,7 @@ using namespace caf; using namespace caf::net; CAF_TEST(serialization) { - basp::header x{basp::message_type::handshake, 42, 4}; + basp::header x{basp::message_type::handshake, 4}; byte_buffer buf; { binary_serializer sink{nullptr, buf}; @@ -53,6 +53,6 @@ CAF_TEST(serialization) { } CAF_TEST(to_string) { - basp::header x{basp::message_type::handshake, 42, 4}; - CAF_CHECK_EQUAL(deep_to_string(x), "basp::header(handshake, 42, 4)"); + basp::header x{basp::message_type::handshake, 4}; + CAF_CHECK_EQUAL(deep_to_string(x), "basp::header(handshake, 4)"); } diff --git a/libcaf_net/test/net/basp/ping_pong.cpp b/libcaf_net/test/net/basp/ping_pong.cpp index 0fb66ee8..3a4d60dc 100644 --- a/libcaf_net/test/net/basp/ping_pong.cpp +++ b/libcaf_net/test/net/basp/ping_pong.cpp @@ -69,7 +69,7 @@ template class planet : public test_coordinator_fixture> { public: planet(planet_driver& driver) - : mpx(*this->sys.network_manager().mpx()), driver_(driver) { + : mpx(this->sys.network_manager().mpx()), driver_(driver) { mpx.set_thread_id(); } @@ -97,7 +97,8 @@ class planet : public test_coordinator_fixture> { actor resolve(string_view locator) { auto hdl = actor_cast(this->self); this->sys.network_manager().resolve(unbox(make_uri(locator)), hdl); - this->run(); + while (handle_io_event()) + ; actor result; this->self->receive( [&](strong_actor_ptr& ptr, const std::set&) { diff --git a/libcaf_net/test/net/basp/worker.cpp b/libcaf_net/test/net/basp/worker.cpp index 53bed76d..cdec53d9 100644 --- a/libcaf_net/test/net/basp/worker.cpp +++ b/libcaf_net/test/net/basp/worker.cpp @@ -111,11 +111,10 @@ CAF_TEST(deliver serialized message) { byte_buffer payload; std::vector stages; binary_serializer sink{sys, payload}; - if (auto err = sink(node_id{}, self->id(), testee.id(), stages, - make_message(ok_atom_v))) - CAF_FAIL("unable to serialize message: " << err); + if (!sink.apply_objects(node_id{}, self->id(), testee.id(), stages, + make_message(ok_atom_v))) + CAF_FAIL("unable to serialize message: " << sink.get_error()); net::basp::header hdr{net::basp::message_type::actor_message, - static_cast(payload.size()), make_message_id().integer_value()}; CAF_MESSAGE("launch worker"); w->launch(last_hop, hdr, payload); diff --git a/libcaf_net/test/net/length_prefix_framing.cpp b/libcaf_net/test/net/length_prefix_framing.cpp index 41669261..332c3642 100644 --- a/libcaf_net/test/net/length_prefix_framing.cpp +++ b/libcaf_net/test/net/length_prefix_framing.cpp @@ -31,10 +31,13 @@ #include "caf/byte_buffer.hpp" #include "caf/byte_span.hpp" #include "caf/detail/network_order.hpp" +#include "caf/net/receive_policy.hpp" +#include "caf/net/stream_oriented_layer_ptr.hpp" #include "caf/span.hpp" #include "caf/tag/message_oriented.hpp" using namespace caf; +using namespace caf::net; namespace { @@ -43,6 +46,11 @@ namespace { struct ul_expect_messages { using input_tag = tag::message_oriented; + template + error init(socket_manager*, LowerLayerPtr, const settings&) { + return none; + } + void set_expected_messages(std::vector messages) { expected_messages.clear(); for (auto& msg : messages) @@ -87,30 +95,34 @@ struct ll_provide_stream_for_messages { } void run() { + auto this_layer_ptr = make_stream_oriented_layer_ptr(this, this); + settings cfg; + CAF_CHECK_EQUAL(upper_layer.init(nullptr, this_layer_ptr, cfg), none); CAF_CHECK(data_stream.size() != 0); while (processed != data_stream.size()) { - auto all_data = make_span(data_stream.data() + processed, - data_stream.size() - processed); - auto new_data = make_span(data_stream.data() + offered, - data_stream.size() - offered); - auto newly_offered = new_data.size(); - auto consumed = upper_layer.consume(*this, all_data, new_data); - CAF_CHECK(consumed >= 0); + auto all_data = make_span(data_stream.data(), min_read_size); + auto new_data = make_span(data_stream.data(), min_read_size); + CAF_MESSAGE("offering " << min_read_size << " bytes"); + auto consumed = upper_layer.consume(this_layer_ptr, all_data, new_data); + CAF_MESSAGE("Layer consumed " << consumed << " bytes"); + CAF_REQUIRE(consumed >= 0); CAF_CHECK(static_cast(consumed) <= data_stream.size()); - offered += newly_offered; processed += consumed; - if (consumed > 0) { + if (consumed > 0) data_stream.erase(data_stream.begin(), data_stream.begin() + consumed); - offered -= processed; - processed = 0; - } if (consumed == 0 || data_stream.size() == 0) return; } } + template + void configure_read(LowerLayerPtr&, receive_policy policy) { + min_read_size = policy.min_size; + } + + size_t min_read_size = 0; + size_t processed = 0; - size_t offered = 0; std::vector data_stream; @@ -186,28 +198,4 @@ CAF_TEST(process messages) { test_receive_data(); } -CAF_TEST(incomplete message) { - generate_messages(1, 1000); - CAF_MESSAGE("data.size() = " << data.size()); - auto initial_size = data.size(); - auto data_copy = data; - auto mid = data.size() / 2; - data.resize(mid); - CAF_MESSAGE("data.size() = " << data.size()); - data_copy.erase(data_copy.begin(), data_copy.begin() + mid); - CAF_MESSAGE("data_copy.size() = " << data_copy.size()); - CAF_REQUIRE(data.size() + data_copy.size() == initial_size); - // Don't set expectations because there shouldn't be a complete message - // in the bytes. - auto messages_copy = messages; - messages.clear(); - CAF_REQUIRE(messages.empty()); - set_expectations(); - test_receive_data(); - data.insert(data.end(), data_copy.begin(), data_copy.end()); - messages = messages_copy; - set_expectations(); - test_receive_data(); -} - CAF_TEST_FIXTURE_SCOPE_END() diff --git a/libcaf_net/test/net/web_socket_server.cpp b/libcaf_net/test/net/web_socket_server.cpp index e434f1dc..cb52fd49 100644 --- a/libcaf_net/test/net/web_socket_server.cpp +++ b/libcaf_net/test/net/web_socket_server.cpp @@ -197,7 +197,8 @@ CAF_TEST(handshakes may arrive in chunks) { CAF_CHECK_EQUAL(transport.handle_input(), 0u); CAF_CHECK(!ws->handshake_complete()); transport.push(bufs[2]); - CAF_CHECK_EQUAL(transport.handle_input(), opening_handshake.size()); + CAF_CHECK_EQUAL(static_cast(transport.handle_input()), + opening_handshake.size()); CAF_CHECK(ws->handshake_complete()); } diff --git a/libcaf_net/test/pipe_socket.cpp b/libcaf_net/test/pipe_socket.cpp index ccc4913c..5dc6270a 100644 --- a/libcaf_net/test/pipe_socket.cpp +++ b/libcaf_net/test/pipe_socket.cpp @@ -39,8 +39,10 @@ CAF_TEST(send and receive) { pipe_socket rd_sock; pipe_socket wr_sock; std::tie(rd_sock, wr_sock) = unbox(make_pipe()); - CAF_CHECK_EQUAL(write(wr_sock, send_buf), send_buf.size()); - CAF_CHECK_EQUAL(read(rd_sock, receive_buf), send_buf.size()); + CAF_CHECK_EQUAL(static_cast(write(wr_sock, send_buf)), + send_buf.size()); + CAF_CHECK_EQUAL(static_cast(read(rd_sock, receive_buf)), + send_buf.size()); CAF_CHECK(std::equal(send_buf.begin(), send_buf.end(), receive_buf.begin())); } diff --git a/libcaf_net/test/stream_socket.cpp b/libcaf_net/test/stream_socket.cpp index 36a869ef..11e6a009 100644 --- a/libcaf_net/test/stream_socket.cpp +++ b/libcaf_net/test/stream_socket.cpp @@ -84,16 +84,16 @@ CAF_TEST(read on empty sockets) { CAF_TEST(transfer data from first to second socket) { byte_buffer wr_buf{1_b, 2_b, 4_b, 8_b, 16_b, 32_b, 64_b}; CAF_MESSAGE("transfer data from first to second socket"); - CAF_CHECK_EQUAL(write(first, wr_buf), wr_buf.size()); - CAF_CHECK_EQUAL(read(second, rd_buf), wr_buf.size()); + CAF_CHECK_EQUAL(static_cast(write(first, wr_buf)), wr_buf.size()); + CAF_CHECK_EQUAL(static_cast(read(second, rd_buf)), wr_buf.size()); CAF_CHECK(std::equal(wr_buf.begin(), wr_buf.end(), rd_buf.begin())); rd_buf.assign(rd_buf.size(), byte(0)); } CAF_TEST(transfer data from second to first socket) { byte_buffer wr_buf{1_b, 2_b, 4_b, 8_b, 16_b, 32_b, 64_b}; - CAF_CHECK_EQUAL(write(second, wr_buf), wr_buf.size()); - CAF_CHECK_EQUAL(read(first, rd_buf), wr_buf.size()); + CAF_CHECK_EQUAL(static_cast(write(second, wr_buf)), wr_buf.size()); + CAF_CHECK_EQUAL(static_cast(read(first, rd_buf)), wr_buf.size()); CAF_CHECK(std::equal(wr_buf.begin(), wr_buf.end(), rd_buf.begin())); } @@ -109,9 +109,10 @@ CAF_TEST(transfer data using multiple buffers) { byte_buffer full_buf; full_buf.insert(full_buf.end(), wr_buf_1.begin(), wr_buf_1.end()); full_buf.insert(full_buf.end(), wr_buf_2.begin(), wr_buf_2.end()); - CAF_CHECK_EQUAL(write(second, {make_span(wr_buf_1), make_span(wr_buf_2)}), + CAF_CHECK_EQUAL(static_cast( + write(second, {make_span(wr_buf_1), make_span(wr_buf_2)})), full_buf.size()); - CAF_CHECK_EQUAL(read(first, rd_buf), full_buf.size()); + CAF_CHECK_EQUAL(static_cast(read(first, rd_buf)), full_buf.size()); CAF_CHECK(std::equal(full_buf.begin(), full_buf.end(), rd_buf.begin())); } diff --git a/libcaf_net/test/stream_transport.cpp b/libcaf_net/test/stream_transport.cpp index bb6fe861..482ed857 100644 --- a/libcaf_net/test/stream_transport.cpp +++ b/libcaf_net/test/stream_transport.cpp @@ -23,8 +23,6 @@ #include "caf/net/test/host_fixture.hpp" #include "caf/test/dsl.hpp" -#include "caf/binary_deserializer.hpp" -#include "caf/binary_serializer.hpp" #include "caf/byte.hpp" #include "caf/byte_buffer.hpp" #include "caf/detail/scope_guard.hpp" @@ -118,19 +116,6 @@ class dummy_application { return recv_buf_->size(); } - template - void resolve(ParentPtr parent, string_view path, const actor& listener) { - actor_id aid = 42; - auto hid = string_view("0011223344556677889900112233445566778899"); - auto nid = unbox(make_node_id(42, hid)); - actor_config cfg; - endpoint_manager_ptr ptr{&parent->manager()}; - auto p = make_actor( - aid, nid, &parent->system(), cfg, std::move(ptr)); - anon_send(listener, resolve_atom_v, std::string{path.begin(), path.end()}, - p); - } - static void handle_error(sec code) { CAF_FAIL("handle_error called with " << CAF_ARG(code)); }