Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 46 additions & 7 deletions cmd/silkworm.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// Copyright 2025 The Silkworm Authors
// SPDX-License-Identifier: Apache-2.0

#include <atomic>
#include <chrono>
#include <cstdlib>
#include <future>
#include <memory>
#include <optional>
#include <stdexcept>
Expand All @@ -20,8 +24,6 @@
#include <silkworm/infra/cli/common.hpp>
#include <silkworm/infra/cli/shutdown_signal.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/concurrency/awaitable_wait_for_all.hpp>
#include <silkworm/infra/concurrency/awaitable_wait_for_one.hpp>
#include <silkworm/infra/grpc/client/client_context_pool.hpp>
#include <silkworm/node/cli/node_options.hpp>
#include <silkworm/node/node.hpp>
Expand All @@ -33,7 +35,6 @@ using namespace silkworm;
using silkworm::BlockNum;
using silkworm::DataDirectory;
using silkworm::human_size;
using silkworm::cmd::common::ShutdownSignal;

const char* current_exception_name() {
#ifdef WIN32
Expand Down Expand Up @@ -182,11 +183,16 @@ void parse_silkworm_command_line(CLI::App& cli, int argc, char* argv[], node::Se
settings.sentry_settings.network_id = node_settings.network_id;
}

Task<void> wait_for_shutdown_signal(node::Node& execution_node, std::atomic_bool& shutdown_requested) {
co_await cmd::common::ShutdownSignal::wait();
shutdown_requested.store(true, std::memory_order_relaxed);
SILK_INFO << "Shutdown signal observed; stopping Silkworm";
execution_node.request_stop();
}

// main
int main(int argc, char* argv[]) {
using namespace std::chrono;
using namespace silkworm::concurrency::awaitable_wait_for_one;
using namespace silkworm::concurrency::awaitable_wait_for_all;

std::set_terminate([]() {
try {
Expand Down Expand Up @@ -218,6 +224,7 @@ int main(int argc, char* argv[]) {
silkworm::rpc::ClientContextPool context_pool{
settings.server_settings.context_pool_settings,
};
std::atomic_bool shutdown_requested{false};

silkworm::node::Node execution_node{
context_pool,
Expand All @@ -226,14 +233,46 @@ int main(int argc, char* argv[]) {

// Go!
auto run_future = boost::asio::co_spawn(
context_pool.any_executor(), execution_node.run(), boost::asio::use_future);
auto signal_future = boost::asio::co_spawn(
context_pool.any_executor(),
execution_node.run() || ShutdownSignal::wait(),
wait_for_shutdown_signal(execution_node, shutdown_requested),
boost::asio::use_future);
context_pool.start();
SILK_INFO << "Silkworm is now running";

// Wait for shutdown signal or an exception from tasks
run_future.get();
std::exception_ptr run_exception;
try {
run_future.get();
} catch (...) {
run_exception = std::current_exception();
}

execution_node.request_stop();
context_pool.stop();
context_pool.join();

if (signal_future.wait_for(seconds{0}) == std::future_status::ready) {
try {
signal_future.get();
} catch (const boost::system::system_error& ex) {
if (ex.code() != boost::system::errc::operation_canceled) {
throw;
}
}
}

if (run_exception) {
try {
std::rethrow_exception(run_exception);
} catch (const boost::system::system_error& ex) {
if (!shutdown_requested.load(std::memory_order_relaxed) ||
ex.code() != boost::system::errc::operation_canceled) {
throw;
}
}
}

// Graceful exit after user shutdown signal
SILK_INFO << "Exiting Silkworm";
Expand Down
10 changes: 9 additions & 1 deletion silkworm/db/snapshot_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ SnapshotSync::SnapshotSync(
Task<void> SnapshotSync::run() {
using namespace concurrency::awaitable_wait_for_all;

stop_requested_.store(false, std::memory_order_relaxed);
[[maybe_unused]] auto _ = gsl::finally([this]() { this->is_stopping_latch_.count_down(); });

if (!settings_.enabled) {
Expand All @@ -97,6 +98,11 @@ Task<void> SnapshotSync::run() {
}
}

void SnapshotSync::request_stop() {
stop_requested_.store(true, std::memory_order_relaxed);
client_.stop();
}

Task<void> SnapshotSync::setup_and_run() {
using namespace concurrency::awaitable_wait_for_all;

Expand Down Expand Up @@ -137,7 +143,9 @@ Task<void> SnapshotSync::setup() {

// Update chain and stage progresses in database according to available snapshots
datastore::kvdb::RWTxnManaged rw_txn = data_store_.chaindata.access_rw().start_rw_tx();
update_database(rw_txn, blocks_repository().max_timestamp_available(), [this] { return is_stopping_latch_.try_wait(); });
update_database(rw_txn, blocks_repository().max_timestamp_available(), [this] {
return stop_requested_.load(std::memory_order_relaxed) || is_stopping_latch_.try_wait();
});
rw_txn.commit_and_stop();

if (!settings_.no_seeding) {
Expand Down
2 changes: 2 additions & 0 deletions silkworm/db/snapshot_sync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class SnapshotSync {
datastore::StageScheduler& stage_scheduler);

Task<void> run();
void request_stop();

Task<void> download_snapshots();
Task<void> wait_for_setup();
Expand Down Expand Up @@ -75,6 +76,7 @@ class SnapshotSync {
db::Freezer snapshot_freezer_;
datastore::SnapshotMerger snapshot_merger_;

std::atomic_bool stop_requested_{false};
std::latch is_stopping_latch_;
std::atomic_bool setup_done_;
concurrency::AwaitableConditionVariable setup_done_cond_var_;
Expand Down
3 changes: 2 additions & 1 deletion silkworm/execution/api/active_direct_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ class ActiveDirectService : public DirectService, public ActiveComponent {

Task<BlockNum> block_progress() override;

bool stop() override;

protected:
void execution_loop() override;
bool stop() override;

private:
boost::asio::io_context& ioc_;
Expand Down
4 changes: 4 additions & 0 deletions silkworm/execution/grpc/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ Task<void> Server::async_run(std::optional<size_t> stack_size) {
return p_impl_->async_run("exec-engine", stack_size);
}

void Server::shutdown() {
p_impl_->shutdown();
}

} // namespace silkworm::execution::grpc::server
1 change: 1 addition & 0 deletions silkworm/execution/grpc/server/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Server final {
Server& operator=(const Server&) = delete;

Task<void> async_run(std::optional<size_t> stack_size = {});
void shutdown();

private:
std::unique_ptr<ServerImpl> p_impl_;
Expand Down
11 changes: 8 additions & 3 deletions silkworm/infra/concurrency/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

namespace silkworm::concurrency {

inline bool is_channel_stop_error(const boost::system::error_code& error) {
return error == boost::asio::experimental::error::channel_cancelled ||
error == boost::asio::experimental::error::channel_closed;
}

template <typename T>
class Channel {
public:
Expand All @@ -28,7 +33,7 @@ class Channel {
try {
co_await channel_.async_send(boost::system::error_code(), value, boost::asio::use_awaitable);
} catch (const boost::system::system_error& ex) {
if (ex.code() == boost::asio::experimental::error::channel_cancelled) {
if (is_channel_stop_error(ex.code())) {
throw boost::system::system_error(make_error_code(boost::system::errc::operation_canceled));
}
throw;
Expand All @@ -43,7 +48,7 @@ class Channel {
try {
co_return (co_await channel_.async_receive(boost::asio::use_awaitable));
} catch (const boost::system::system_error& ex) {
if (ex.code() == boost::asio::experimental::error::channel_cancelled) {
if (is_channel_stop_error(ex.code())) {
throw boost::system::system_error(make_error_code(boost::system::errc::operation_canceled));
}
throw;
Expand All @@ -53,7 +58,7 @@ class Channel {
std::optional<T> try_receive() {
std::optional<T> result;
channel_.try_receive([&](const boost::system::error_code& error, T&& value) {
if (error == boost::asio::experimental::error::channel_cancelled) {
if (is_channel_stop_error(error)) {
throw boost::system::system_error(make_error_code(boost::system::errc::operation_canceled));
}
if (error) {
Expand Down
5 changes: 4 additions & 1 deletion silkworm/infra/grpc/server/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class Server {
// Order matters here: 1) shutdown the server (immediate deadline)
if (server_) {
server_->Shutdown(gpr_time_0(GPR_CLOCK_REALTIME));
server_->Wait();
}

SILK_TRACE << "Server::shutdown " << this << " stopping context pool";
Expand All @@ -124,6 +123,10 @@ class Server {
context_pool_->stop();
}

if (server_) {
server_->Wait();
}

SILK_TRACE << "Server::shutdown " << this << " END";
}

Expand Down
24 changes: 22 additions & 2 deletions silkworm/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class NodeImpl final {
Task<void> run();
Task<void> run_tasks();
Task<void> wait_for_setup();
void request_stop();

BlockNum last_pre_validated_block() const { return chain_sync_.last_pre_validated_block(); }

Expand Down Expand Up @@ -212,7 +213,7 @@ NodeImpl::NodeImpl(
/* use_preverified_hashes = */ true,
make_sync_engine_rpc_settings(settings.rpcdaemon_settings, settings.log_settings.log_verbosity),
},
resource_usage_log_{*settings_.node_settings.data_directory} {
resource_usage_log_{context_pool.any_executor(), *settings_.node_settings.data_directory} {
backend_ = std::make_unique<EthereumBackEnd>(settings_.node_settings, data_store_.chaindata().access_ro(), std::get<0>(sentry_));
backend_->set_node_name(settings_.node_settings.build_info.node_name);
backend_kv_rpc_server_ = std::make_unique<BackEndKvServer>(settings_.server_settings, *backend_);
Expand Down Expand Up @@ -241,6 +242,21 @@ Task<void> NodeImpl::run() {
}
}

void NodeImpl::request_stop() {
snapshot_sync_.request_stop();
chain_sync_.request_stop();
resource_usage_log_.stop();
execution_service_->stop();
execution_engine_.stop();
execution_server_.shutdown();
if (backend_kv_rpc_server_) {
backend_kv_rpc_server_->shutdown();
}
if (auto& sentry_server = std::get<1>(sentry_)) {
sentry_server->request_stop();
}
}

Task<void> NodeImpl::run_tasks() {
using namespace concurrency::awaitable_wait_for_all;

Expand All @@ -256,7 +272,7 @@ Task<void> NodeImpl::run_tasks() {

Task<void> NodeImpl::run_execution_service() {
// Thread running block execution requires custom stack size because of deep EVM call stacks
return execution_service_->async_run("exec-engine", /* stack_size = */ kExecutionThreadStackSize);
co_await execution_service_->async_run("exec-engine", /* stack_size = */ kExecutionThreadStackSize);
}

Task<void> NodeImpl::run_execution_server() {
Expand Down Expand Up @@ -308,4 +324,8 @@ Task<void> Node::wait_for_setup() {
return p_impl_->wait_for_setup();
}

void Node::request_stop() {
p_impl_->request_stop();
}

} // namespace silkworm::node
1 change: 1 addition & 0 deletions silkworm/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Node {

Task<void> run();
Task<void> wait_for_setup();
void request_stop();

private:
std::unique_ptr<NodeImpl> p_impl_;
Expand Down
16 changes: 14 additions & 2 deletions silkworm/node/resource_usage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/common/mem_usage.hpp>
#include <silkworm/infra/common/stopwatch.hpp>
#include <silkworm/infra/concurrency/awaitable_wait_for_one.hpp>

namespace silkworm::node {

Expand All @@ -23,14 +24,20 @@ using std::chrono::steady_clock;
static constexpr std::chrono::seconds kResourceUsageInterval{300s};

Task<void> ResourceUsageLog::run() {
using namespace concurrency::awaitable_wait_for_one;

auto executor = co_await boost::asio::this_coro::executor;
boost::asio::steady_timer timer{executor};

const auto start_time = steady_clock::now();
while (true) {
stop_requested_.store(false, std::memory_order_relaxed);
while (!stop_requested_.load(std::memory_order_relaxed)) {
try {
timer.expires_after(kResourceUsageInterval);
co_await timer.async_wait(boost::asio::use_awaitable);
co_await (timer.async_wait(boost::asio::use_awaitable) || stop_notifier_.wait());
if (stop_requested_.load(std::memory_order_relaxed)) {
co_return;
}

log::Info("Resource usage", {"mem", human_size(os::get_mem_usage()),
"chain", human_size(data_directory_.chaindata().size()),
Expand All @@ -44,4 +51,9 @@ Task<void> ResourceUsageLog::run() {
}
}

void ResourceUsageLog::stop() {
stop_requested_.store(true, std::memory_order_relaxed);
stop_notifier_.notify();
}

} // namespace silkworm::node
12 changes: 10 additions & 2 deletions silkworm/node/resource_usage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,29 @@

#pragma once

#include <atomic>

#include <silkworm/infra/concurrency/task.hpp>

#include <boost/asio/any_io_executor.hpp>

#include <silkworm/infra/common/directories.hpp>
#include <silkworm/infra/concurrency/event_notifier.hpp>

namespace silkworm::node {

//! Log for resource usage
class ResourceUsageLog {
public:
explicit ResourceUsageLog(const DataDirectory& data_directory)
: data_directory_(data_directory) {}
explicit ResourceUsageLog(const boost::asio::any_io_executor& executor, const DataDirectory& data_directory)
: stop_notifier_(executor), data_directory_(data_directory) {}

Task<void> run();
void stop();

private:
std::atomic_bool stop_requested_{false};
concurrency::EventNotifier stop_notifier_;
const DataDirectory& data_directory_;
};

Expand Down
6 changes: 6 additions & 0 deletions silkworm/node/stagedsync/execution_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ void ExecutionEngine::close() {
context_pool_.reset();
}

bool ExecutionEngine::stop() {
const bool engine_stopped = Stoppable::stop();
const bool main_chain_stopped = main_chain_.stop();
return engine_stopped || main_chain_stopped;
}

BlockNum ExecutionEngine::block_progress() const {
return block_progress_; // main_chain_.get_block_progress() or forks block progress
}
Expand Down
1 change: 1 addition & 0 deletions silkworm/node/stagedsync/execution_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class ExecutionEngine : public execution::api::ExecutionEngine, public Stoppable
// needed to circumvent mdbx threading model limitations
void open() override;
void close() override;
bool stop() override;

// actions
void insert_blocks(const std::vector<std::shared_ptr<Block>>& blocks) override;
Expand Down
Loading