Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,14 @@ configuration::configuration()
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
0,
{.min = 0, .max = 100_MiB})
, raft_max_load_local_snapshots_per_shard(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have a "maximum recoveries per shard" config

Are we double dipping on our configuration options?

*this,
"raft_max_load_local_snapshots_per_shard",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix description

"This is the maximum number of snapshots that may be loaded from the "
"disk, typically during recovery.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::numeric_limits<uint32_t>::max(),
{.min = 1, .max = std::numeric_limits<uint32_t>::max()})
, enable_usage(
*this,
"enable_usage",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ struct configuration final : public config_store {
raft_max_inflight_follower_append_entries_requests_per_shard;
bounded_property<size_t>
raft_max_buffered_follower_append_entries_bytes_per_shard;
bounded_property<uint32_t> raft_max_load_local_snapshots_per_shard;

// Kafka
property<bool> enable_usage;
bounded_property<size_t> usage_num_windows;
Expand Down
1 change: 1 addition & 0 deletions src/v/raft/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ redpanda_cc_library(
"//src/v/storage:record_batch_builder",
"//src/v/storage:record_batch_utils",
"//src/v/utils:absl_sstring_hash",
"//src/v/utils:adjustable_semaphore",
"//src/v/utils:delta_for",
"//src/v/utils:expiring_promise",
"//src/v/utils:human",
Expand Down
26 changes: 25 additions & 1 deletion src/v/raft/persisted_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,31 @@
#include "raft/persisted_stm.h"

#include "bytes/iostream.h"
#include "config/configuration.h"
#include "raft/consensus.h"
#include "raft/state_machine_base.h"
#include "ssx/future-util.h"
#include "ssx/sformat.h"
#include "storage/api.h"
#include "storage/kvstore.h"
#include "storage/snapshot.h"
#include "utils/adjustable_semaphore.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>

#include <exception>
#include <filesystem>
#include <limits>
namespace raft {
namespace {

// shard local semaphore which maintains the maximum number of snapshots that
// stms are allowed to load at once on this shard
thread_local adjustable_semaphore max_par_local_snapshots{
std::numeric_limits<uint32_t>::max()};

std::optional<raft::stm_snapshot_header> read_snapshot_header(
iobuf_parser& parser, const model::ntp& ntp, const ss::sstring& name) {
auto version = reflection::adl<int8_t>{}.from(parser);
Expand Down Expand Up @@ -77,7 +85,23 @@ ss::future<> persisted_stm_base<BaseT, T>::apply(
template<typename BaseT, supported_stm_snapshot T>
ss::future<std::optional<stm_snapshot>>
persisted_stm_base<BaseT, T>::load_local_snapshot() {
return _snapshot_backend.load_snapshot();
// check for config changes
uint32_t max_local_snapshot_loads
= config::shard_local_cfg()
.raft_max_load_local_snapshots_per_shard.value();

// update capacity
max_par_local_snapshots.set_capacity(
static_cast<uint64_t>(max_local_snapshot_loads));

// pass the semaphore units along the path of loading the snapshot
return max_par_local_snapshots.get_units(1).then(
[this](auto snapshot_units) {
return _snapshot_backend.load_snapshot().then(
[snapshot_units = std::move(snapshot_units)](auto maybe_snapshot) {
return maybe_snapshot;
});
});
}
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::stop() {
Expand Down
19 changes: 17 additions & 2 deletions src/v/utils/adjustable_semaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
#pragma once

#include "base/seastarx.h"
#include "base/vassert.h"
#include "ssx/semaphore.h"

#include <limits>

/**
* This class is extension of ss::semaphore to fit the needs
* of the storage_resources class's tracking of byte/concurrency
Expand All @@ -33,14 +36,26 @@
* control the capacity of a semaphore.
*/
class adjustable_semaphore {
private:
// seastar's underlying semaphore count is backed by ssize_t but is
// initialized off size_t with no boundary check
static constexpr size_t initialization_capacity{1};
static constexpr size_t max_capacity{std::numeric_limits<ssize_t>::max()};

public:
explicit adjustable_semaphore(uint64_t capacity)
: adjustable_semaphore(capacity, "s/allowance") {}

adjustable_semaphore(uint64_t capacity, const ss::sstring& sem_name)
: _sem(capacity, sem_name)
, _capacity(capacity) {}
: _sem(initialization_capacity, sem_name)
, _capacity(initialization_capacity) {
set_capacity(capacity);
}

void set_capacity(uint64_t capacity) noexcept {
vassert(
capacity <= max_capacity, "max size for semaphore is ssize_t::max");

if (capacity > _capacity) {
_sem.signal(capacity - _capacity);
} else if (capacity < _capacity) {
Expand Down
Loading