Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
/runtime
/output
/test/output

/bld
# Ignore hidden files
.*
*.swp
Expand Down
5 changes: 5 additions & 0 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ int FSMCaller::on_start_following(const LeaderChangeContext& start_following_con
return 0;
}


int FSMCaller::on_stop_following(const LeaderChangeContext& stop_following_context) {
ApplyTask task;
task.type = STOP_FOLLOWING;
Expand All @@ -496,6 +497,10 @@ void FSMCaller::do_stop_following(const LeaderChangeContext& stop_following_cont
_fsm->on_stop_following(stop_following_context);
}

void FSMCaller::on_pre_send_snapshot(const PeerId& peer_id) {
_fsm->on_pre_send_snapshot(peer_id);
}

void FSMCaller::describe(std::ostream &os, bool use_html) {
const char* newline = (use_html) ? "<br>" : "\n";
TaskType cur_task = _cur_task;
Expand Down
1 change: 1 addition & 0 deletions src/braft/fsm_caller.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class BAIDU_CACHELINE_ALIGNMENT FSMCaller {
int on_leader_start(int64_t term, int64_t lease_epoch);
int on_start_following(const LeaderChangeContext& start_following_context);
int on_stop_following(const LeaderChangeContext& stop_following_context);
void on_pre_send_snapshot(const PeerId& peer_id);
BRAFT_MOCK int on_error(const Error& e);
int64_t last_applied_index() const {
return _last_applied_index.load(butil::memory_order_relaxed);
Expand Down
10 changes: 10 additions & 0 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ int NodeImpl::init(const NodeOptions& options) {
rg_options.election_timeout_ms = _options.election_timeout_ms;
rg_options.log_manager = _log_manager;
rg_options.ballot_box = _ballot_box;
rg_options.send_data_to_witness = _options.send_data_to_witness;
rg_options.node = this;
rg_options.snapshot_throttle = _options.snapshot_throttle
? _options.snapshot_throttle->get()
Expand Down Expand Up @@ -1366,6 +1367,10 @@ void NodeImpl::on_error(const Error& e) {
lck.unlock();
}

void NodeImpl::pre_send_snapshot(const PeerId& peer_id) {
_fsm_caller->on_pre_send_snapshot(peer_id);
}

void NodeImpl::handle_vote_timeout() {
std::unique_lock<raft_mutex_t> lck(_mutex);

Expand Down Expand Up @@ -3383,6 +3388,11 @@ bool NodeImpl::readonly() {
return _node_readonly || _majority_nodes_readonly;
}

int NodeImpl::change_witness_config(bool send_data_to_witness){
_options.send_data_to_witness = send_data_to_witness;
return _replicator_group.change_witness_config(send_data_to_witness);
}

int NodeImpl::change_readonly_config(int64_t term, const PeerId& peer_id, bool readonly) {
BAIDU_SCOPED_LOCK(_mutex);
if (term != _current_term && _state != STATE_LEADER) {
Expand Down
3 changes: 3 additions & 0 deletions src/braft/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ friend class VoteBallotCtx;
void leave_readonly_mode();
bool readonly();
int change_readonly_config(int64_t term, const PeerId& peer_id, bool readonly);
int change_witness_config(bool send_data_to_witness);
void check_majority_nodes_readonly();
void check_majority_nodes_readonly(const Configuration& conf);

Expand All @@ -241,6 +242,8 @@ friend class VoteBallotCtx;

bool disable_cli() const { return _options.disable_cli; }
bool is_witness() const { return _options.witness; }
// Called when leader start to send snapshot to remote peer
void pre_send_snapshot(const PeerId& peer_id);
private:
friend class butil::RefCountedThreadSafe<NodeImpl>;

Expand Down
5 changes: 5 additions & 0 deletions src/braft/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ bool Node::readonly() {
return _impl->readonly();
}

int Node::change_witness_config(bool send){
return _impl->change_witness_config(send);
}

// ------------- Iterator
void Iterator::next() {
if (valid()) {
Expand Down Expand Up @@ -313,6 +317,7 @@ void StateMachine::on_configuration_committed(const Configuration& conf, int64_t

void StateMachine::on_stop_following(const LeaderChangeContext&) {}
void StateMachine::on_start_following(const LeaderChangeContext&) {}
void StateMachine::on_pre_send_snapshot(const PeerId& peer_id) {}

BootstrapOptions::BootstrapOptions()
: last_log_index(0)
Expand Down
7 changes: 6 additions & 1 deletion src/braft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ class StateMachine {
// the very leader whom the follower starts to follow.
// User can reset the node's information as it starts to follow some leader.
virtual void on_start_following(const ::braft::LeaderChangeContext& ctx);

// Invoked when the leader start to send snapshot to |peer_id|
// Default: Do nothing
virtual void on_pre_send_snapshot(const PeerId& peer_id);
};

enum State {
Expand Down Expand Up @@ -604,6 +608,7 @@ struct NodeOptions {
// Default: false
bool witness = false;
// Construct a default instance
bool send_data_to_witness = true;
NodeOptions();

int get_catchup_timeout_ms();
Expand Down Expand Up @@ -788,7 +793,7 @@ class Node {
// - This node is a leader, and the count of writable nodes in the group
// is less than the majority.
bool readonly();

int change_witness_config(bool send);
private:
NodeImpl* _impl;
};
Expand Down
49 changes: 46 additions & 3 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ DEFINE_int32(raft_retry_replicate_interval_ms, 1000,
"Interval of retry to append entries or install snapshot");
BRPC_VALIDATE_GFLAG(raft_retry_replicate_interval_ms,
brpc::PositiveInteger);
DEFINE_bool(raft_use_conn_pool, false, "use conn pool for raft replicator");
BRPC_VALIDATE_GFLAG(raft_use_conn_pool, ::brpc::PassValidate);

DECLARE_bool(raft_enable_witness_to_leader);
DECLARE_int64(raft_append_entry_high_lat_us);
Expand Down Expand Up @@ -115,6 +117,9 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) {
Replicator* r = new Replicator();
brpc::ChannelOptions channel_opt;
channel_opt.connect_timeout_ms = FLAGS_raft_rpc_channel_connect_timeout_ms;
if (FLAGS_raft_use_conn_pool) {
channel_opt.connection_type = "pooled";
}
channel_opt.timeout_ms = -1; // We don't need RPC timeout
if (r->_sending_channel.Init(options.peer_id.addr, &channel_opt) != 0) {
LOG(ERROR) << "Fail to init sending channel"
Expand Down Expand Up @@ -630,6 +635,11 @@ int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf *data) {
} else {
CHECK(entry->type != ENTRY_TYPE_CONFIGURATION) << "log_index=" << log_index;
}
// use group-level configuration preferentially
if (is_witness() && !_options.send_data_to_witness) {
entry->Release();
return 0;
}
if (!is_witness() || FLAGS_raft_enable_witness_to_leader) {
em->set_data_len(entry->data.length());
data->append(entry->data);
Expand Down Expand Up @@ -790,7 +800,7 @@ void Replicator::_install_snapshot() {
add_one_more_task(true)) {
return _block(butil::gettimeofday_us(), EBUSY);
}

node_impl->pre_send_snapshot(_options.peer_id);
// pre-set replicator state to INSTALLING_SNAPSHOT, so replicator could be
// blocked if something is wrong, such as throttled for a period of time
_st.st = INSTALLING_SNAPSHOT;
Expand Down Expand Up @@ -1224,6 +1234,21 @@ int Replicator::change_readonly_config(ReplicatorId id, bool readonly) {
return r->_change_readonly_config(readonly);
}

int Replicator::change_witness_config(ReplicatorId id, bool send){
Replicator *r = NULL;
bthread_id_t dummy_id = { id };
if (bthread_id_lock(dummy_id, (void**)&r) != 0) {
return 0;
}
return r->_change_witness_config(send);
}

int Replicator::_change_witness_config(bool send){
_options.send_data_to_witness = send;
CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
return 0;
}

int Replicator::_change_readonly_config(bool readonly) {
if ((readonly && _readonly_index != 0) ||
(!readonly && _readonly_index == 0)) {
Expand Down Expand Up @@ -1382,6 +1407,7 @@ int ReplicatorGroup::init(const NodeId& node_id, const ReplicatorGroupOptions& o
_election_timeout_ms = options.election_timeout_ms;
_common_options.log_manager = options.log_manager;
_common_options.ballot_box = options.ballot_box;
_common_options.send_data_to_witness = options.send_data_to_witness;
_common_options.node = options.node;
_common_options.term = 0;
_common_options.group_id = node_id.group_id;
Expand Down Expand Up @@ -1549,12 +1575,18 @@ int ReplicatorGroup::find_the_next_candidate(
}
const int64_t next_index = Replicator::get_next_index(iter->id_and_status.id);
const int consecutive_error_times = Replicator::get_consecutive_error_times(iter->id_and_status.id);
if (consecutive_error_times == 0 && next_index > max_index && !iter->peer_id.is_witness()) {
if (consecutive_error_times == 0 && next_index > max_index) {
max_index = next_index;
if (peer_id) {
*peer_id = iter->peer_id;
}
}
// transfer leadership to the non witness peer priority.
if (consecutive_error_times == 0 && next_index == max_index) {
if (peer_id && peer_id->is_witness()) {
*peer_id = iter->peer_id;
}
}
}
if (max_index == 0) {
return -1;
Expand All @@ -1580,7 +1612,18 @@ void ReplicatorGroup::list_replicators(
out->push_back(std::make_pair(iter->first, iter->second.id));
}
}

int ReplicatorGroup::change_witness_config(bool send_data_to_witness){
_common_options.send_data_to_witness = send_data_to_witness;
for (std::map<PeerId, ReplicatorIdAndStatus>::const_iterator
iter = _rmap.begin(); iter != _rmap.end(); ++iter) {
int ret = Replicator::change_witness_config(iter->second.id, send_data_to_witness);
if(ret !=0) {
return ret;
}
}
return 0;
}

int ReplicatorGroup::change_readonly_config(const PeerId& peer, bool readonly) {
std::map<PeerId, ReplicatorIdAndStatus>::const_iterator iter = _rmap.find(peer);
if (iter == _rmap.end()) {
Expand Down
10 changes: 7 additions & 3 deletions src/braft/replicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct ReplicatorOptions {
ReplicatorOptions();
int* dynamic_heartbeat_timeout_ms;
int* election_timeout_ms;
bool send_data_to_witness;
GroupId group_id;
PeerId server_id;
PeerId peer_id;
Expand Down Expand Up @@ -124,7 +125,9 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator {
// Change the readonly config.
// Return 0 if success, the error code otherwise.
static int change_readonly_config(ReplicatorId id, bool readonly);

// Change the witness config.
// Return 0 if success, the error code otherwise.
static int change_witness_config(ReplicatorId id, bool send);
// Check if a replicator is readonly
static bool readonly(ReplicatorId id);

Expand Down Expand Up @@ -169,7 +172,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator {
return _next_index - _flying_append_entries_size;
}
int _change_readonly_config(bool readonly);

int _change_witness_config(bool send);
static void _on_rpc_returned(
ReplicatorId id, brpc::Controller* cntl,
AppendEntriesRequest* request,
Expand Down Expand Up @@ -267,6 +270,7 @@ struct ReplicatorGroupOptions {
ReplicatorGroupOptions();
int heartbeat_timeout_ms;
int election_timeout_ms;
bool send_data_to_witness = true;
LogManager* log_manager;
BallotBox* ballot_box;
NodeImpl* node;
Expand Down Expand Up @@ -357,7 +361,7 @@ class ReplicatorGroup {

// Change the readonly config for a peer
int change_readonly_config(const PeerId& peer, bool readonly);

int change_witness_config(bool send_data_to_witness);
// Check if a replicator is in readonly
bool readonly(const PeerId& peer) const;

Expand Down
Loading