From ec510969c8660e4ab0d97f8cfddc757cbf7d6121 Mon Sep 17 00:00:00 2001 From: Daniel Larimer Date: Wed, 29 Apr 2026 23:37:41 -0500 Subject: [PATCH] Gracefully stop node on shutdown signal Run the node task and shutdown-signal task separately so SIGINT requests an orderly stop before the process joins its context pool. Propagate request_stop through sync, sentry, snapshot sync, gRPC servers, resource usage logging, and staged execution, and treat expected cancellation paths as graceful shutdown instead of fatal errors. --- cmd/silkworm.cpp | 53 ++++++++++++++++--- silkworm/db/snapshot_sync.cpp | 10 +++- silkworm/db/snapshot_sync.hpp | 2 + .../execution/api/active_direct_service.hpp | 3 +- silkworm/execution/grpc/server/server.cpp | 4 ++ silkworm/execution/grpc/server/server.hpp | 1 + silkworm/infra/concurrency/channel.hpp | 11 ++-- silkworm/infra/grpc/server/server.hpp | 5 +- silkworm/node/node.cpp | 24 ++++++++- silkworm/node/node.hpp | 1 + silkworm/node/resource_usage.cpp | 16 +++++- silkworm/node/resource_usage.hpp | 12 ++++- silkworm/node/stagedsync/execution_engine.cpp | 6 +++ silkworm/node/stagedsync/execution_engine.hpp | 1 + .../node/stagedsync/execution_pipeline.cpp | 4 +- silkworm/node/stagedsync/forks/main_chain.cpp | 4 ++ silkworm/node/stagedsync/forks/main_chain.hpp | 1 + .../sentry/discovery/disc_v4/discovery.cpp | 10 ++++ .../sentry/discovery/disc_v4/discovery.hpp | 1 + silkworm/sentry/discovery/disc_v4/server.cpp | 12 +++++ silkworm/sentry/discovery/disc_v4/server.hpp | 1 + silkworm/sentry/discovery/discovery.cpp | 9 ++++ silkworm/sentry/discovery/discovery.hpp | 1 + silkworm/sentry/grpc/server/server.cpp | 4 ++ silkworm/sentry/grpc/server/server.hpp | 1 + silkworm/sentry/message_receiver.cpp | 7 +++ silkworm/sentry/message_receiver.hpp | 1 + silkworm/sentry/message_sender.hpp | 1 + silkworm/sentry/peer_discovery_feedback.hpp | 1 + silkworm/sentry/peer_manager.cpp | 11 ++++ silkworm/sentry/peer_manager.hpp | 1 + silkworm/sentry/peer_manager_api.cpp | 12 +++++ silkworm/sentry/peer_manager_api.hpp | 1 + silkworm/sentry/rlpx/server.cpp | 5 ++ silkworm/sentry/rlpx/server.hpp | 1 + silkworm/sentry/sentry.cpp | 33 +++++++++--- silkworm/sentry/sentry.hpp | 1 + silkworm/sentry/status_manager.hpp | 1 + silkworm/sync/chain_sync.hpp | 1 + silkworm/sync/sync.cpp | 19 +++++-- silkworm/sync/sync.hpp | 1 + silkworm/sync/sync_pos.cpp | 10 +++- silkworm/sync/sync_pos.hpp | 4 ++ silkworm/sync/sync_pow.cpp | 7 ++- silkworm/sync/sync_pow.hpp | 1 + 45 files changed, 279 insertions(+), 37 deletions(-) diff --git a/cmd/silkworm.cpp b/cmd/silkworm.cpp index 26750f2291..539a77e5b2 100644 --- a/cmd/silkworm.cpp +++ b/cmd/silkworm.cpp @@ -1,6 +1,10 @@ // Copyright 2025 The Silkworm Authors // SPDX-License-Identifier: Apache-2.0 +#include +#include +#include +#include #include #include #include @@ -20,8 +24,6 @@ #include #include #include -#include -#include #include #include #include @@ -33,7 +35,6 @@ using namespace silkworm; using silkworm::BlockNum; using silkworm::DataDirectory; using silkworm::human_size; -using silkworm::cmd::common::ShutdownSignal; const char* current_exception_name() { #ifdef WIN32 @@ -182,11 +183,16 @@ void parse_silkworm_command_line(CLI::App& cli, int argc, char* argv[], node::Se settings.sentry_settings.network_id = node_settings.network_id; } +Task wait_for_shutdown_signal(node::Node& execution_node, std::atomic_bool& shutdown_requested) { + co_await cmd::common::ShutdownSignal::wait(); + shutdown_requested.store(true, std::memory_order_relaxed); + SILK_INFO << "Shutdown signal observed; stopping Silkworm"; + execution_node.request_stop(); +} + // main int main(int argc, char* argv[]) { using namespace std::chrono; - using namespace silkworm::concurrency::awaitable_wait_for_one; - using namespace silkworm::concurrency::awaitable_wait_for_all; std::set_terminate([]() { try { @@ -218,6 +224,7 @@ int main(int argc, char* argv[]) { silkworm::rpc::ClientContextPool context_pool{ settings.server_settings.context_pool_settings, }; + std::atomic_bool shutdown_requested{false}; silkworm::node::Node execution_node{ context_pool, @@ -226,14 +233,46 @@ int main(int argc, char* argv[]) { // Go! auto run_future = boost::asio::co_spawn( + context_pool.any_executor(), execution_node.run(), boost::asio::use_future); + auto signal_future = boost::asio::co_spawn( context_pool.any_executor(), - execution_node.run() || ShutdownSignal::wait(), + wait_for_shutdown_signal(execution_node, shutdown_requested), boost::asio::use_future); context_pool.start(); SILK_INFO << "Silkworm is now running"; // Wait for shutdown signal or an exception from tasks - run_future.get(); + std::exception_ptr run_exception; + try { + run_future.get(); + } catch (...) { + run_exception = std::current_exception(); + } + + execution_node.request_stop(); + context_pool.stop(); + context_pool.join(); + + if (signal_future.wait_for(seconds{0}) == std::future_status::ready) { + try { + signal_future.get(); + } catch (const boost::system::system_error& ex) { + if (ex.code() != boost::system::errc::operation_canceled) { + throw; + } + } + } + + if (run_exception) { + try { + std::rethrow_exception(run_exception); + } catch (const boost::system::system_error& ex) { + if (!shutdown_requested.load(std::memory_order_relaxed) || + ex.code() != boost::system::errc::operation_canceled) { + throw; + } + } + } // Graceful exit after user shutdown signal SILK_INFO << "Exiting Silkworm"; diff --git a/silkworm/db/snapshot_sync.cpp b/silkworm/db/snapshot_sync.cpp index 3bfb7e316d..fcbf6503f2 100644 --- a/silkworm/db/snapshot_sync.cpp +++ b/silkworm/db/snapshot_sync.cpp @@ -73,6 +73,7 @@ SnapshotSync::SnapshotSync( Task SnapshotSync::run() { using namespace concurrency::awaitable_wait_for_all; + stop_requested_.store(false, std::memory_order_relaxed); [[maybe_unused]] auto _ = gsl::finally([this]() { this->is_stopping_latch_.count_down(); }); if (!settings_.enabled) { @@ -97,6 +98,11 @@ Task SnapshotSync::run() { } } +void SnapshotSync::request_stop() { + stop_requested_.store(true, std::memory_order_relaxed); + client_.stop(); +} + Task SnapshotSync::setup_and_run() { using namespace concurrency::awaitable_wait_for_all; @@ -137,7 +143,9 @@ Task SnapshotSync::setup() { // Update chain and stage progresses in database according to available snapshots datastore::kvdb::RWTxnManaged rw_txn = data_store_.chaindata.access_rw().start_rw_tx(); - update_database(rw_txn, blocks_repository().max_timestamp_available(), [this] { return is_stopping_latch_.try_wait(); }); + update_database(rw_txn, blocks_repository().max_timestamp_available(), [this] { + return stop_requested_.load(std::memory_order_relaxed) || is_stopping_latch_.try_wait(); + }); rw_txn.commit_and_stop(); if (!settings_.no_seeding) { diff --git a/silkworm/db/snapshot_sync.hpp b/silkworm/db/snapshot_sync.hpp index 22c71daa8d..fdd23c2eab 100644 --- a/silkworm/db/snapshot_sync.hpp +++ b/silkworm/db/snapshot_sync.hpp @@ -43,6 +43,7 @@ class SnapshotSync { datastore::StageScheduler& stage_scheduler); Task run(); + void request_stop(); Task download_snapshots(); Task wait_for_setup(); @@ -75,6 +76,7 @@ class SnapshotSync { db::Freezer snapshot_freezer_; datastore::SnapshotMerger snapshot_merger_; + std::atomic_bool stop_requested_{false}; std::latch is_stopping_latch_; std::atomic_bool setup_done_; concurrency::AwaitableConditionVariable setup_done_cond_var_; diff --git a/silkworm/execution/api/active_direct_service.hpp b/silkworm/execution/api/active_direct_service.hpp index 1d010e297c..0cc4f098fd 100644 --- a/silkworm/execution/api/active_direct_service.hpp +++ b/silkworm/execution/api/active_direct_service.hpp @@ -97,9 +97,10 @@ class ActiveDirectService : public DirectService, public ActiveComponent { Task block_progress() override; + bool stop() override; + protected: void execution_loop() override; - bool stop() override; private: boost::asio::io_context& ioc_; diff --git a/silkworm/execution/grpc/server/server.cpp b/silkworm/execution/grpc/server/server.cpp index bfaffbab85..d1f66fab84 100644 --- a/silkworm/execution/grpc/server/server.cpp +++ b/silkworm/execution/grpc/server/server.cpp @@ -107,4 +107,8 @@ Task Server::async_run(std::optional stack_size) { return p_impl_->async_run("exec-engine", stack_size); } +void Server::shutdown() { + p_impl_->shutdown(); +} + } // namespace silkworm::execution::grpc::server diff --git a/silkworm/execution/grpc/server/server.hpp b/silkworm/execution/grpc/server/server.hpp index c5c3d3b6e2..241a08f080 100644 --- a/silkworm/execution/grpc/server/server.hpp +++ b/silkworm/execution/grpc/server/server.hpp @@ -24,6 +24,7 @@ class Server final { Server& operator=(const Server&) = delete; Task async_run(std::optional stack_size = {}); + void shutdown(); private: std::unique_ptr p_impl_; diff --git a/silkworm/infra/concurrency/channel.hpp b/silkworm/infra/concurrency/channel.hpp index cfe0c8764f..3b2ed2afa6 100644 --- a/silkworm/infra/concurrency/channel.hpp +++ b/silkworm/infra/concurrency/channel.hpp @@ -17,6 +17,11 @@ namespace silkworm::concurrency { +inline bool is_channel_stop_error(const boost::system::error_code& error) { + return error == boost::asio::experimental::error::channel_cancelled || + error == boost::asio::experimental::error::channel_closed; +} + template class Channel { public: @@ -28,7 +33,7 @@ class Channel { try { co_await channel_.async_send(boost::system::error_code(), value, boost::asio::use_awaitable); } catch (const boost::system::system_error& ex) { - if (ex.code() == boost::asio::experimental::error::channel_cancelled) { + if (is_channel_stop_error(ex.code())) { throw boost::system::system_error(make_error_code(boost::system::errc::operation_canceled)); } throw; @@ -43,7 +48,7 @@ class Channel { try { co_return (co_await channel_.async_receive(boost::asio::use_awaitable)); } catch (const boost::system::system_error& ex) { - if (ex.code() == boost::asio::experimental::error::channel_cancelled) { + if (is_channel_stop_error(ex.code())) { throw boost::system::system_error(make_error_code(boost::system::errc::operation_canceled)); } throw; @@ -53,7 +58,7 @@ class Channel { std::optional try_receive() { std::optional result; channel_.try_receive([&](const boost::system::error_code& error, T&& value) { - if (error == boost::asio::experimental::error::channel_cancelled) { + if (is_channel_stop_error(error)) { throw boost::system::system_error(make_error_code(boost::system::errc::operation_canceled)); } if (error) { diff --git a/silkworm/infra/grpc/server/server.hpp b/silkworm/infra/grpc/server/server.hpp index 366150095a..d7759377a8 100644 --- a/silkworm/infra/grpc/server/server.hpp +++ b/silkworm/infra/grpc/server/server.hpp @@ -114,7 +114,6 @@ class Server { // Order matters here: 1) shutdown the server (immediate deadline) if (server_) { server_->Shutdown(gpr_time_0(GPR_CLOCK_REALTIME)); - server_->Wait(); } SILK_TRACE << "Server::shutdown " << this << " stopping context pool"; @@ -124,6 +123,10 @@ class Server { context_pool_->stop(); } + if (server_) { + server_->Wait(); + } + SILK_TRACE << "Server::shutdown " << this << " END"; } diff --git a/silkworm/node/node.cpp b/silkworm/node/node.cpp index 9fe069b86e..659f9424f7 100644 --- a/silkworm/node/node.cpp +++ b/silkworm/node/node.cpp @@ -48,6 +48,7 @@ class NodeImpl final { Task run(); Task run_tasks(); Task wait_for_setup(); + void request_stop(); BlockNum last_pre_validated_block() const { return chain_sync_.last_pre_validated_block(); } @@ -212,7 +213,7 @@ NodeImpl::NodeImpl( /* use_preverified_hashes = */ true, make_sync_engine_rpc_settings(settings.rpcdaemon_settings, settings.log_settings.log_verbosity), }, - resource_usage_log_{*settings_.node_settings.data_directory} { + resource_usage_log_{context_pool.any_executor(), *settings_.node_settings.data_directory} { backend_ = std::make_unique(settings_.node_settings, data_store_.chaindata().access_ro(), std::get<0>(sentry_)); backend_->set_node_name(settings_.node_settings.build_info.node_name); backend_kv_rpc_server_ = std::make_unique(settings_.server_settings, *backend_); @@ -241,6 +242,21 @@ Task NodeImpl::run() { } } +void NodeImpl::request_stop() { + snapshot_sync_.request_stop(); + chain_sync_.request_stop(); + resource_usage_log_.stop(); + execution_service_->stop(); + execution_engine_.stop(); + execution_server_.shutdown(); + if (backend_kv_rpc_server_) { + backend_kv_rpc_server_->shutdown(); + } + if (auto& sentry_server = std::get<1>(sentry_)) { + sentry_server->request_stop(); + } +} + Task NodeImpl::run_tasks() { using namespace concurrency::awaitable_wait_for_all; @@ -256,7 +272,7 @@ Task NodeImpl::run_tasks() { Task NodeImpl::run_execution_service() { // Thread running block execution requires custom stack size because of deep EVM call stacks - return execution_service_->async_run("exec-engine", /* stack_size = */ kExecutionThreadStackSize); + co_await execution_service_->async_run("exec-engine", /* stack_size = */ kExecutionThreadStackSize); } Task NodeImpl::run_execution_server() { @@ -308,4 +324,8 @@ Task Node::wait_for_setup() { return p_impl_->wait_for_setup(); } +void Node::request_stop() { + p_impl_->request_stop(); +} + } // namespace silkworm::node diff --git a/silkworm/node/node.hpp b/silkworm/node/node.hpp index efaad2ea41..835cb024df 100644 --- a/silkworm/node/node.hpp +++ b/silkworm/node/node.hpp @@ -29,6 +29,7 @@ class Node { Task run(); Task wait_for_setup(); + void request_stop(); private: std::unique_ptr p_impl_; diff --git a/silkworm/node/resource_usage.cpp b/silkworm/node/resource_usage.cpp index d73cbe10e5..e6884e761b 100644 --- a/silkworm/node/resource_usage.cpp +++ b/silkworm/node/resource_usage.cpp @@ -14,6 +14,7 @@ #include #include #include +#include namespace silkworm::node { @@ -23,14 +24,20 @@ using std::chrono::steady_clock; static constexpr std::chrono::seconds kResourceUsageInterval{300s}; Task ResourceUsageLog::run() { + using namespace concurrency::awaitable_wait_for_one; + auto executor = co_await boost::asio::this_coro::executor; boost::asio::steady_timer timer{executor}; const auto start_time = steady_clock::now(); - while (true) { + stop_requested_.store(false, std::memory_order_relaxed); + while (!stop_requested_.load(std::memory_order_relaxed)) { try { timer.expires_after(kResourceUsageInterval); - co_await timer.async_wait(boost::asio::use_awaitable); + co_await (timer.async_wait(boost::asio::use_awaitable) || stop_notifier_.wait()); + if (stop_requested_.load(std::memory_order_relaxed)) { + co_return; + } log::Info("Resource usage", {"mem", human_size(os::get_mem_usage()), "chain", human_size(data_directory_.chaindata().size()), @@ -44,4 +51,9 @@ Task ResourceUsageLog::run() { } } +void ResourceUsageLog::stop() { + stop_requested_.store(true, std::memory_order_relaxed); + stop_notifier_.notify(); +} + } // namespace silkworm::node diff --git a/silkworm/node/resource_usage.hpp b/silkworm/node/resource_usage.hpp index 529db86d78..a68c5a51b7 100644 --- a/silkworm/node/resource_usage.hpp +++ b/silkworm/node/resource_usage.hpp @@ -3,21 +3,29 @@ #pragma once +#include + #include +#include + #include +#include namespace silkworm::node { //! Log for resource usage class ResourceUsageLog { public: - explicit ResourceUsageLog(const DataDirectory& data_directory) - : data_directory_(data_directory) {} + explicit ResourceUsageLog(const boost::asio::any_io_executor& executor, const DataDirectory& data_directory) + : stop_notifier_(executor), data_directory_(data_directory) {} Task run(); + void stop(); private: + std::atomic_bool stop_requested_{false}; + concurrency::EventNotifier stop_notifier_; const DataDirectory& data_directory_; }; diff --git a/silkworm/node/stagedsync/execution_engine.cpp b/silkworm/node/stagedsync/execution_engine.cpp index 97d276a7e7..0a335a3fc7 100644 --- a/silkworm/node/stagedsync/execution_engine.cpp +++ b/silkworm/node/stagedsync/execution_engine.cpp @@ -50,6 +50,12 @@ void ExecutionEngine::close() { context_pool_.reset(); } +bool ExecutionEngine::stop() { + const bool engine_stopped = Stoppable::stop(); + const bool main_chain_stopped = main_chain_.stop(); + return engine_stopped || main_chain_stopped; +} + BlockNum ExecutionEngine::block_progress() const { return block_progress_; // main_chain_.get_block_progress() or forks block progress } diff --git a/silkworm/node/stagedsync/execution_engine.hpp b/silkworm/node/stagedsync/execution_engine.hpp index 9af7a9d12f..e2a59a3ab2 100644 --- a/silkworm/node/stagedsync/execution_engine.hpp +++ b/silkworm/node/stagedsync/execution_engine.hpp @@ -56,6 +56,7 @@ class ExecutionEngine : public execution::api::ExecutionEngine, public Stoppable // needed to circumvent mdbx threading model limitations void open() override; void close() override; + bool stop() override; // actions void insert_blocks(const std::vector>& blocks) override; diff --git a/silkworm/node/stagedsync/execution_pipeline.cpp b/silkworm/node/stagedsync/execution_pipeline.cpp index 52209ae271..329a159c0c 100644 --- a/silkworm/node/stagedsync/execution_pipeline.cpp +++ b/silkworm/node/stagedsync/execution_pipeline.cpp @@ -88,10 +88,10 @@ std::optional ExecutionPipeline::bad_block() { } bool ExecutionPipeline::stop() { - bool stopped{true}; + bool stopped{Stoppable::stop()}; for (const auto& [_, stage] : stages_) { if (!stage->is_stopping()) { - stopped &= stage->stop(); + stopped = stage->stop() || stopped; } } return stopped; diff --git a/silkworm/node/stagedsync/forks/main_chain.cpp b/silkworm/node/stagedsync/forks/main_chain.cpp index 0914750bac..5a92cf0036 100644 --- a/silkworm/node/stagedsync/forks/main_chain.cpp +++ b/silkworm/node/stagedsync/forks/main_chain.cpp @@ -129,6 +129,10 @@ void MainChain::abort() { tx_.abort(); } +bool MainChain::stop() { + return pipeline_.stop(); +} + NodeSettings& MainChain::node_settings() { return node_settings_; } diff --git a/silkworm/node/stagedsync/forks/main_chain.hpp b/silkworm/node/stagedsync/forks/main_chain.hpp index 2d31c260c7..84c271ba5d 100644 --- a/silkworm/node/stagedsync/forks/main_chain.hpp +++ b/silkworm/node/stagedsync/forks/main_chain.hpp @@ -42,6 +42,7 @@ class MainChain { void open(); // needed to circumvent mdbx threading model limitations void close(); void abort(); + bool stop(); // extension void insert_block(const Block&); diff --git a/silkworm/sentry/discovery/disc_v4/discovery.cpp b/silkworm/sentry/discovery/disc_v4/discovery.cpp index fbdd115e60..12fb68cc27 100644 --- a/silkworm/sentry/discovery/disc_v4/discovery.cpp +++ b/silkworm/sentry/discovery/disc_v4/discovery.cpp @@ -70,6 +70,12 @@ class DiscoveryImpl : private MessageHandler { discover_more_needed_notifier_.notify(); } + void stop() { + server_.stop(); + discover_more_needed_notifier_.notify(); + discovered_event_notifier_.notify(); + } + private: uint64_t local_enr_seq_num() const { return this->node_record_().seq_num; @@ -250,4 +256,8 @@ void Discovery::discover_more_needed() { p_impl_->discover_more_needed(); } +void Discovery::stop() { + p_impl_->stop(); +} + } // namespace silkworm::sentry::discovery::disc_v4 diff --git a/silkworm/sentry/discovery/disc_v4/discovery.hpp b/silkworm/sentry/discovery/disc_v4/discovery.hpp index 6676bcd7bf..d8485b4b69 100644 --- a/silkworm/sentry/discovery/disc_v4/discovery.hpp +++ b/silkworm/sentry/discovery/disc_v4/discovery.hpp @@ -34,6 +34,7 @@ class Discovery { Discovery& operator=(const Discovery&) = delete; Task run(); + void stop(); void discover_more_needed(); diff --git a/silkworm/sentry/discovery/disc_v4/server.cpp b/silkworm/sentry/discovery/disc_v4/server.cpp index 7fad9d20fd..6bf6b9673f 100644 --- a/silkworm/sentry/discovery/disc_v4/server.cpp +++ b/silkworm/sentry/discovery/disc_v4/server.cpp @@ -132,6 +132,14 @@ class ServerImpl { } } + void stop() { + boost::asio::dispatch(socket_.get_executor(), [this] { + boost::system::error_code error; + socket_.cancel(error); + socket_.close(error); + }); + } + template Task send_message(const TMessage& message, ip::udp::endpoint recipient) { return send_message(Message{TMessage::kId, message.rlp_encode()}, std::move(recipient)); @@ -191,6 +199,10 @@ Task Server::run() { return p_impl_->run(); } +void Server::stop() { + p_impl_->stop(); +} + Task Server::send_ping(ping::PingMessage message, ip::udp::endpoint recipient) { return p_impl_->send_message(message, std::move(recipient)); } diff --git a/silkworm/sentry/discovery/disc_v4/server.hpp b/silkworm/sentry/discovery/disc_v4/server.hpp index ea48b028e5..32e9c63318 100644 --- a/silkworm/sentry/discovery/disc_v4/server.hpp +++ b/silkworm/sentry/discovery/disc_v4/server.hpp @@ -33,6 +33,7 @@ class Server : public MessageSender { void setup(); Task run(); + void stop(); Task send_ping(ping::PingMessage message, boost::asio::ip::udp::endpoint recipient) override; Task send_pong(ping::PongMessage message, boost::asio::ip::udp::endpoint recipient) override; diff --git a/silkworm/sentry/discovery/discovery.cpp b/silkworm/sentry/discovery/discovery.cpp index b30945add2..cae9199f70 100644 --- a/silkworm/sentry/discovery/discovery.cpp +++ b/silkworm/sentry/discovery/discovery.cpp @@ -37,6 +37,7 @@ class DiscoveryImpl { DiscoveryImpl& operator=(const DiscoveryImpl&) = delete; Task run(); + void stop(); Task> request_peer_candidates( size_t max_count, @@ -129,6 +130,10 @@ Task DiscoveryImpl::run() { } } +void DiscoveryImpl::stop() { + disc_v4_discovery_.stop(); +} + void DiscoveryImpl::setup_node_db() { DataDirectory data_dir{data_dir_path_, true}; node_db_.setup(data_dir.nodes().path()); @@ -237,6 +242,10 @@ Task Discovery::run() { } } +void Discovery::stop() { + p_impl_->stop(); +} + Task> Discovery::request_peer_candidates( size_t max_count, std::vector exclude_urls) { diff --git a/silkworm/sentry/discovery/discovery.hpp b/silkworm/sentry/discovery/discovery.hpp index 32dba995b1..8d25be2aed 100644 --- a/silkworm/sentry/discovery/discovery.hpp +++ b/silkworm/sentry/discovery/discovery.hpp @@ -41,6 +41,7 @@ class Discovery { Discovery& operator=(const Discovery&) = delete; Task run(); + void stop(); struct PeerCandidate { EnodeUrl url; diff --git a/silkworm/sentry/grpc/server/server.cpp b/silkworm/sentry/grpc/server/server.cpp index f85f294d49..3d5c0aabdb 100644 --- a/silkworm/sentry/grpc/server/server.cpp +++ b/silkworm/sentry/grpc/server/server.cpp @@ -111,4 +111,8 @@ Task Server::async_run() { return p_impl_->async_run("sentry-gsrv"); } +void Server::shutdown() { + p_impl_->shutdown(); +} + } // namespace silkworm::sentry::grpc::server diff --git a/silkworm/sentry/grpc/server/server.hpp b/silkworm/sentry/grpc/server/server.hpp index 9acfcfaa9f..9a2c0efff4 100644 --- a/silkworm/sentry/grpc/server/server.hpp +++ b/silkworm/sentry/grpc/server/server.hpp @@ -25,6 +25,7 @@ class Server final { Server& operator=(const Server&) = delete; Task async_run(); + void shutdown(); private: std::unique_ptr p_impl_; diff --git a/silkworm/sentry/message_receiver.cpp b/silkworm/sentry/message_receiver.cpp index 513f6df52a..f0b66ddb71 100644 --- a/silkworm/sentry/message_receiver.cpp +++ b/silkworm/sentry/message_receiver.cpp @@ -40,6 +40,13 @@ Task MessageReceiver::run(std::shared_ptr self, PeerManag } } +void MessageReceiver::stop() { + message_calls_channel_.close(); + for (auto& subscription : subscriptions_) { + subscription.messages_channel->close(); + } +} + Task MessageReceiver::handle_calls() { auto executor = co_await this_coro::executor; diff --git a/silkworm/sentry/message_receiver.hpp b/silkworm/sentry/message_receiver.hpp index ccf765d961..4c08865032 100644 --- a/silkworm/sentry/message_receiver.hpp +++ b/silkworm/sentry/message_receiver.hpp @@ -38,6 +38,7 @@ class MessageReceiver : public PeerManagerObserver { } static Task run(std::shared_ptr self, PeerManager& peer_manager); + void stop(); private: Task handle_calls(); diff --git a/silkworm/sentry/message_sender.hpp b/silkworm/sentry/message_sender.hpp index 1c6858ab5d..a603bf0807 100644 --- a/silkworm/sentry/message_sender.hpp +++ b/silkworm/sentry/message_sender.hpp @@ -24,6 +24,7 @@ class MessageSender { } Task run(PeerManager& peer_manager); + void stop() { send_message_channel_.close(); } private: concurrency::Channel send_message_channel_; diff --git a/silkworm/sentry/peer_discovery_feedback.hpp b/silkworm/sentry/peer_discovery_feedback.hpp index 5af07ada96..9d29211759 100644 --- a/silkworm/sentry/peer_discovery_feedback.hpp +++ b/silkworm/sentry/peer_discovery_feedback.hpp @@ -28,6 +28,7 @@ class PeerDiscoveryFeedback : public PeerManagerObserver { std::shared_ptr self, PeerManager& peer_manager, discovery::Discovery& discovery); + void stop() { peer_disconnected_events_.close(); } private: // PeerManagerObserver diff --git a/silkworm/sentry/peer_manager.cpp b/silkworm/sentry/peer_manager.cpp index a257406e5d..52580be57a 100644 --- a/silkworm/sentry/peer_manager.cpp +++ b/silkworm/sentry/peer_manager.cpp @@ -152,6 +152,17 @@ void PeerManager::add_observer(std::weak_ptr observer) { observers_.push_back(std::move(observer)); } +void PeerManager::stop() { + client_peer_channel_.close(); + need_peers_notifier_.notify(); + for (auto& peer : peers_) { + peer->disconnect(rlpx::DisconnectReason::kDisconnectRequested); + } + for (auto& peer : handshaking_peers_) { + peer->disconnect(rlpx::DisconnectReason::kDisconnectRequested); + } +} + std::list> PeerManager::observers() { std::scoped_lock lock(observers_mutex_); std::list> observers; diff --git a/silkworm/sentry/peer_manager.hpp b/silkworm/sentry/peer_manager.hpp index afbf1cbda7..b6e84e21c2 100644 --- a/silkworm/sentry/peer_manager.hpp +++ b/silkworm/sentry/peer_manager.hpp @@ -60,6 +60,7 @@ class PeerManager { Task enumerate_random_peers(size_t max_count, EnumeratePeersCallback callback); void add_observer(std::weak_ptr observer); + void stop(); private: Task run_in_strand(concurrency::Channel>& peer_channel); diff --git a/silkworm/sentry/peer_manager_api.cpp b/silkworm/sentry/peer_manager_api.cpp index 0fa55217f8..c7dae8d2ee 100644 --- a/silkworm/sentry/peer_manager_api.cpp +++ b/silkworm/sentry/peer_manager_api.cpp @@ -37,6 +37,18 @@ Task PeerManagerApi::run(std::shared_ptr self) { co_await concurrency::spawn_task(self->strand_, std::move(run)); } +void PeerManagerApi::stop() { + peer_count_calls_channel_.close(); + peers_calls_channel_.close(); + peer_calls_channel_.close(); + peer_penalize_calls_channel_.close(); + peer_events_calls_channel_.close(); + peer_events_channel_.close(); + for (auto& subscription : events_subscriptions_) { + subscription.events_channel->close(); + } +} + Task PeerManagerApi::handle_peer_count_calls() { // loop until receive() throws a cancelled exception while (true) { diff --git a/silkworm/sentry/peer_manager_api.hpp b/silkworm/sentry/peer_manager_api.hpp index 290829b95e..e72f7cb4fc 100644 --- a/silkworm/sentry/peer_manager_api.hpp +++ b/silkworm/sentry/peer_manager_api.hpp @@ -44,6 +44,7 @@ class PeerManagerApi : public PeerManagerObserver { peer_events_channel_(executor, 1000) {} static Task run(std::shared_ptr self); + void stop(); template using Channel = concurrency::Channel; diff --git a/silkworm/sentry/rlpx/server.cpp b/silkworm/sentry/rlpx/server.cpp index d7a2f6d897..78fba5ca48 100644 --- a/silkworm/sentry/rlpx/server.cpp +++ b/silkworm/sentry/rlpx/server.cpp @@ -65,6 +65,11 @@ Task Server::run( try { co_await acceptor.async_accept(stream.socket(), use_awaitable); } catch (const boost::system::system_error& ex) { + if (ex.code() == error::operation_aborted || + ex.code() == boost::system::errc::operation_canceled) { + SILK_DEBUG_M("sentry") << "Sentry RLPx server accept cancelled"; + throw; + } if (ex.code() == boost::system::errc::invalid_argument) { SILK_ERROR_M("sentry") << "Sentry RLPx server got invalid_argument on accept port=" << port_; continue; diff --git a/silkworm/sentry/rlpx/server.hpp b/silkworm/sentry/rlpx/server.hpp index c77f2f6764..e75fbe4074 100644 --- a/silkworm/sentry/rlpx/server.hpp +++ b/silkworm/sentry/rlpx/server.hpp @@ -32,6 +32,7 @@ class Server final { EccKeyPair node_key, std::string client_id, std::function()> protocol_factory); + void stop() { peer_channel_.close(); } const boost::asio::ip::address& ip() const { return ip_; } boost::asio::ip::tcp::endpoint listen_endpoint() const; diff --git a/silkworm/sentry/sentry.cpp b/silkworm/sentry/sentry.cpp index 3dc4a2a95a..af0627f106 100644 --- a/silkworm/sentry/sentry.cpp +++ b/silkworm/sentry/sentry.cpp @@ -49,6 +49,7 @@ class SentryImpl final { SentryImpl& operator=(const SentryImpl&) = delete; Task run(); + void request_stop(); std::shared_ptr service() { return direct_service_; } @@ -172,6 +173,18 @@ Task SentryImpl::run() { } } +void SentryImpl::request_stop() { + status_manager_.stop(); + rlpx_server_.stop(); + discovery_.stop(); + peer_manager_.stop(); + message_sender_.stop(); + message_receiver_->stop(); + peer_manager_api_->stop(); + peer_discovery_feedback_->stop(); + grpc_server_.shutdown(); +} + void SentryImpl::setup_node_key() { DataDirectory data_dir{settings_.data_dir_path, true}; NodeKey node_key = node_key_get_or_generate(settings_.node_key, data_dir); @@ -214,11 +227,11 @@ std::function()> SentryImpl::protocol_factory() } Task SentryImpl::run_status_manager() { - return status_manager_.run(); + co_await status_manager_.run(); } Task SentryImpl::run_server() { - return rlpx_server_.run(executor_pool_, node_key_.value(), client_id(), protocol_factory()); + co_await rlpx_server_.run(executor_pool_, node_key_.value(), client_id(), protocol_factory()); } std::unique_ptr SentryImpl::make_client() { @@ -235,12 +248,12 @@ std::function()> SentryImpl::client_factory() { } Task SentryImpl::run_discovery() { - return discovery_.run(); + co_await discovery_.run(); } Task SentryImpl::run_peer_manager() { try { - return peer_manager_.run(rlpx_server_, discovery_, make_protocol(), client_factory()); + co_await peer_manager_.run(rlpx_server_, discovery_, make_protocol(), client_factory()); } catch (const boost::system::system_error& se) { if (se.code() == boost::system::errc::operation_canceled) { SILK_DEBUG_M("sentry") << "run_peer_manager unexpected end [operation_canceled]"; @@ -253,7 +266,7 @@ Task SentryImpl::run_peer_manager() { Task SentryImpl::run_message_sender() { try { - return message_sender_.run(peer_manager_); + co_await message_sender_.run(peer_manager_); } catch (const boost::system::system_error& se) { if (se.code() == boost::system::errc::operation_canceled) { SILK_DEBUG_M("sentry") << "run_message_sender unexpected end [operation_canceled]"; @@ -266,7 +279,7 @@ Task SentryImpl::run_message_sender() { Task SentryImpl::run_message_receiver() { try { - return MessageReceiver::run(message_receiver_, peer_manager_); + co_await MessageReceiver::run(message_receiver_, peer_manager_); } catch (const boost::system::system_error& se) { if (se.code() == boost::system::errc::operation_canceled) { SILK_DEBUG_M("sentry") << "run_message_receiver unexpected end [operation_canceled]"; @@ -279,7 +292,7 @@ Task SentryImpl::run_message_receiver() { Task SentryImpl::run_peer_manager_api() { try { - return PeerManagerApi::run(peer_manager_api_); + co_await PeerManagerApi::run(peer_manager_api_); } catch (const boost::system::system_error& se) { if (se.code() == boost::system::errc::operation_canceled) { SILK_DEBUG_M("sentry") << "run_peer_manager_api unexpected end [operation_canceled]"; @@ -292,7 +305,7 @@ Task SentryImpl::run_peer_manager_api() { Task SentryImpl::run_peer_discovery_feedback() { try { - return PeerDiscoveryFeedback::run(peer_discovery_feedback_, peer_manager_, discovery_); + co_await PeerDiscoveryFeedback::run(peer_discovery_feedback_, peer_manager_, discovery_); } catch (const boost::system::system_error& se) { if (se.code() == boost::system::errc::operation_canceled) { SILK_DEBUG_M("sentry") << "run_peer_discovery_feedback unexpected end [operation_canceled]"; @@ -388,6 +401,10 @@ Task Sentry::run() { return p_impl_->run(); } +void Sentry::request_stop() { + p_impl_->request_stop(); +} + Task> Sentry::service() { co_return p_impl_->service(); } diff --git a/silkworm/sentry/sentry.hpp b/silkworm/sentry/sentry.hpp index 421f428be6..e70323efb6 100644 --- a/silkworm/sentry/sentry.hpp +++ b/silkworm/sentry/sentry.hpp @@ -28,6 +28,7 @@ class Sentry final : public api::SentryClient { Sentry& operator=(const Sentry&) = delete; Task run(); + void request_stop(); Task> service() override; bool is_ready() override; diff --git a/silkworm/sentry/status_manager.hpp b/silkworm/sentry/status_manager.hpp index 941db1d5e1..1bd731abfd 100644 --- a/silkworm/sentry/status_manager.hpp +++ b/silkworm/sentry/status_manager.hpp @@ -23,6 +23,7 @@ class StatusManager { Task wait_for_status(); Task run(); + void stop() { status_channel_.close(); } concurrency::Channel& status_channel() { return status_channel_; diff --git a/silkworm/sync/chain_sync.hpp b/silkworm/sync/chain_sync.hpp index d034ad80f7..58f113207d 100644 --- a/silkworm/sync/chain_sync.hpp +++ b/silkworm/sync/chain_sync.hpp @@ -20,6 +20,7 @@ class ChainSync { ChainSync& operator=(const ChainSync&) = delete; virtual Task async_run() = 0; + virtual void request_stop() { block_exchange_.stop_downloading(); } protected: IBlockExchange& block_exchange_; diff --git a/silkworm/sync/sync.cpp b/silkworm/sync/sync.cpp index 99bd48b260..aea892c177 100644 --- a/silkworm/sync/sync.cpp +++ b/silkworm/sync/sync.cpp @@ -59,7 +59,15 @@ BlockNum Sync::last_pre_validated_block() const { Task Sync::async_run() { using namespace concurrency::awaitable_wait_for_all; - return (run_tasks() && start_engine_rpc_server()); + co_await (run_tasks() && start_engine_rpc_server()); +} + +void Sync::request_stop() { + block_exchange_.stop_downloading(); + block_exchange_.stop(); + if (chain_sync_) { + chain_sync_->request_stop(); + } } Task Sync::run_tasks() { @@ -68,22 +76,23 @@ Task Sync::run_tasks() { } Task Sync::start_sync_sentry_client() { - return sync_sentry_client_.async_run(); + co_await sync_sentry_client_.async_run(); } Task Sync::start_block_exchange() { - return block_exchange_.async_run("block-exchg"); + co_await block_exchange_.async_run("block-exchg"); } Task Sync::start_chain_sync() { if (!engine_rpc_server_) { - return chain_sync_->async_run(); + co_await chain_sync_->async_run(); + co_return; } // The ChainSync async loop *must* run onto the Engine RPC server unique execution context // This is *strictly* required by the current design assumptions in PoSSync auto& ioc = engine_rpc_server_->context_pool().next_ioc(); - return boost::asio::co_spawn(ioc, chain_sync_->async_run(), boost::asio::use_awaitable); + co_await boost::asio::co_spawn(ioc, chain_sync_->async_run(), boost::asio::use_awaitable); } Task Sync::start_engine_rpc_server() { diff --git a/silkworm/sync/sync.hpp b/silkworm/sync/sync.hpp index acdb804852..3a6f618394 100644 --- a/silkworm/sync/sync.hpp +++ b/silkworm/sync/sync.hpp @@ -38,6 +38,7 @@ class Sync { Sync& operator=(const Sync&) = delete; Task async_run(); + void request_stop(); BlockNum last_pre_validated_block() const; diff --git a/silkworm/sync/sync_pos.cpp b/silkworm/sync/sync_pos.cpp index eb114f0b92..1dd333ef4d 100644 --- a/silkworm/sync/sync_pos.cpp +++ b/silkworm/sync/sync_pos.cpp @@ -37,9 +37,15 @@ PoSSync::PoSSync(IBlockExchange& block_exchange, execution::api::Client& exec_cl : ChainSync(block_exchange, exec_client) {} Task PoSSync::async_run() { + stop_requested_.store(false, std::memory_order_relaxed); co_await download_blocks(); } +void PoSSync::request_stop() { + stop_requested_.store(true, std::memory_order_relaxed); + block_exchange_.stop_downloading(); +} + // Wait for blocks arrival from BlockExchange and insert them into ExecutionEngine Task PoSSync::download_blocks() { using namespace std::chrono_literals; @@ -71,7 +77,7 @@ Task PoSSync::download_blocks() { // main loop try { - while (true) { + while (!stop_requested_.load(std::memory_order_relaxed)) { Blocks blocks; // wait for a batch of blocks @@ -430,4 +436,4 @@ Task PoSSync::get_payload_bodies_by_range(BlockNum co_return payload_bodies; } -} // namespace silkworm::chainsync \ No newline at end of file +} // namespace silkworm::chainsync diff --git a/silkworm/sync/sync_pos.hpp b/silkworm/sync/sync_pos.hpp index a45c604453..a0ed184545 100644 --- a/silkworm/sync/sync_pos.hpp +++ b/silkworm/sync/sync_pos.hpp @@ -3,6 +3,8 @@ #pragma once +#include + #include #include @@ -22,6 +24,7 @@ class PoSSync : public ChainSync, public rpc::engine::ExecutionEngine { PoSSync(IBlockExchange&, execution::api::Client&); Task async_run() override; + void request_stop() override; // public interface to download blocks Task download_blocks(); /*[[long_running]]*/ @@ -38,6 +41,7 @@ class PoSSync : public ChainSync, public rpc::engine::ExecutionEngine { std::tuple has_valid_ancestor(const Hash& block_hash); size_t active_chain_validations_{0}; + std::atomic_bool stop_requested_{false}; }; } // namespace silkworm::chainsync diff --git a/silkworm/sync/sync_pow.cpp b/silkworm/sync/sync_pow.cpp index 7a14adb64c..bab81fdffc 100644 --- a/silkworm/sync/sync_pow.cpp +++ b/silkworm/sync/sync_pow.cpp @@ -23,6 +23,11 @@ Task PoWSync::async_run() { return ActiveComponent::async_run("pow-sync-ex"); } +void PoWSync::request_stop() { + block_exchange_.stop_downloading(); + stop(); +} + BlockId PoWSync::resume() { // find the point (head) where we left off BlockId head{}; @@ -219,4 +224,4 @@ void PoWSync::send_new_block_announcements(Blocks blocks) { block_exchange_.accept(message); } -} // namespace silkworm::chainsync \ No newline at end of file +} // namespace silkworm::chainsync diff --git a/silkworm/sync/sync_pow.hpp b/silkworm/sync/sync_pow.hpp index ed78b4040b..059e0ca30f 100644 --- a/silkworm/sync/sync_pow.hpp +++ b/silkworm/sync/sync_pow.hpp @@ -23,6 +23,7 @@ class PoWSync : public ChainSync, ActiveComponent { PoWSync(IBlockExchange&, execution::api::Client&); Task async_run() override; + void request_stop() override; void execution_loop() final; /*[[long_running]]*/