Skip to content

Commit 2946d2d

Browse files
committed
fix-peers-change-failed-when-cluster-restart-in-joint-status
1 parent e32b78a commit 2946d2d

File tree

5 files changed

+501
-9
lines changed

5 files changed

+501
-9
lines changed

src/braft/node.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
#include "braft/node_manager.h"
3434
#include "braft/snapshot_executor.h"
3535
#include "braft/errno.pb.h"
36+
#include "braft/sync_point.h"
37+
#include "butil/logging.h"
38+
3639

3740
namespace braft {
3841

@@ -1585,9 +1588,9 @@ void NodeImpl::pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered) {
15851588
" configuration is possibly out of date";
15861589
return;
15871590
}
1588-
if (!_conf.contains(_server_id)) {
1591+
if (_conf.empty()) {
15891592
LOG(WARNING) << "node " << _group_id << ':' << _server_id
1590-
<< " can't do pre_vote as it is not in " << _conf.conf;
1593+
<< " can't do pre_vote as conf is emtpy";
15911594
return;
15921595
}
15931596

@@ -1644,9 +1647,9 @@ void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck,
16441647
bool old_leader_stepped_down) {
16451648
LOG(INFO) << "node " << _group_id << ":" << _server_id
16461649
<< " term " << _current_term << " start vote and grant vote self";
1647-
if (!_conf.contains(_server_id)) {
1650+
if (_conf.empty()) {
16481651
LOG(WARNING) << "node " << _group_id << ':' << _server_id
1649-
<< " can't do elect_self as it is not in " << _conf.conf;
1652+
<< " can't do elect_self as _conf is empty";
16501653
return;
16511654
}
16521655
// cancel follower election timer
@@ -2108,10 +2111,10 @@ int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request,
21082111
LogId last_log_id = _log_manager->last_log_id(true);
21092112
lck.lock();
21102113
// pre_vote not need ABA check after unlock&lock
2111-
21122114
int64_t votable_time = _follower_lease.votable_time_from_now();
21132115
bool grantable = (LogId(request->last_log_index(), request->last_log_term())
21142116
>= last_log_id);
2117+
BRAFT_VLOG<<"grantable "<< grantable;
21152118
if (grantable) {
21162119
granted = (votable_time == 0);
21172120
rejected_by_lease = (votable_time > 0);
@@ -3267,6 +3270,7 @@ void NodeImpl::ConfigurationCtx::next_stage() {
32673270
// implementation.
32683271
case STAGE_JOINT:
32693272
_stage = STAGE_STABLE;
3273+
TEST_SYNC_POINT_CALLBACK("NodeImpl::ConfigurationCtx:StableStage:BeforeApplyConfiguration", _node);
32703274
return _node->unsafe_apply_configuration(
32713275
Configuration(_new_peers), NULL, false);
32723276
case STAGE_STABLE:

src/braft/sync_point.cpp

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
// Copyright (c) 2017 Baidu.com, Inc. All Rights Reserved
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
16+
// This source code is licensed under both the GPLv2 (found in the
17+
// COPYING file in the root directory) and Apache 2.0 License
18+
// (found in the LICENSE.Apache file in the root directory).
19+
20+
#include "sync_point.h"
21+
22+
#include <atomic>
23+
#include <condition_variable>
24+
#include <functional>
25+
#include <mutex>
26+
#include <string>
27+
#include <thread>
28+
#include <unordered_map>
29+
#include <unordered_set>
30+
31+
#include <fcntl.h>
32+
33+
#ifndef NDEBUG
34+
namespace braft {
35+
36+
struct SyncPoint::Data {
37+
Data() : enabled_(false) {}
38+
// Enable proper deletion by subclasses
39+
virtual ~Data() {}
40+
// successor/predecessor map loaded from LoadDependency
41+
std::unordered_map<std::string, std::vector<std::string>> successors_;
42+
std::unordered_map<std::string, std::vector<std::string>> predecessors_;
43+
std::unordered_map<std::string, std::function<void(void *)>> callbacks_;
44+
std::unordered_map<std::string, std::vector<std::string>> markers_;
45+
std::unordered_map<std::string, std::thread::id> marked_thread_id_;
46+
47+
std::mutex mutex_;
48+
std::condition_variable cv_;
49+
// sync points that have been passed through
50+
std::unordered_set<std::string> cleared_points_;
51+
std::atomic<bool> enabled_;
52+
int num_callbacks_running_ = 0;
53+
54+
void LoadDependency(const std::vector<SyncPointPair> &dependencies);
55+
void LoadDependencyAndMarkers(const std::vector<SyncPointPair> &dependencies,
56+
const std::vector<SyncPointPair> &markers);
57+
bool PredecessorsAllCleared(const std::string &point);
58+
void SetCallBack(const std::string &point,
59+
const std::function<void(void *)> &callback) {
60+
std::lock_guard<std::mutex> lock(mutex_);
61+
callbacks_[point] = callback;
62+
}
63+
64+
void ClearCallBack(const std::string &point);
65+
void ClearAllCallBacks();
66+
void EnableProcessing() { enabled_ = true; }
67+
void DisableProcessing() { enabled_ = false; }
68+
void ClearTrace() {
69+
std::lock_guard<std::mutex> lock(mutex_);
70+
cleared_points_.clear();
71+
}
72+
bool DisabledByMarker(const std::string &point, std::thread::id thread_id) {
73+
auto marked_point_iter = marked_thread_id_.find(point);
74+
return marked_point_iter != marked_thread_id_.end() &&
75+
thread_id != marked_point_iter->second;
76+
}
77+
void Process(const std::string &point, void *cb_arg);
78+
};
79+
80+
SyncPoint *SyncPoint::GetInstance() {
81+
static SyncPoint sync_point;
82+
return &sync_point;
83+
}
84+
85+
SyncPoint::SyncPoint() : impl_(new Data) {}
86+
87+
SyncPoint::~SyncPoint() { delete impl_; }
88+
89+
void SyncPoint::LoadDependency(const std::vector<SyncPointPair> &dependencies) {
90+
impl_->LoadDependency(dependencies);
91+
}
92+
93+
void SyncPoint::LoadDependencyAndMarkers(
94+
const std::vector<SyncPointPair> &dependencies,
95+
const std::vector<SyncPointPair> &markers) {
96+
impl_->LoadDependencyAndMarkers(dependencies, markers);
97+
}
98+
99+
void SyncPoint::SetCallBack(const std::string &point,
100+
const std::function<void(void *)> &callback) {
101+
impl_->SetCallBack(point, callback);
102+
}
103+
104+
void SyncPoint::ClearCallBack(const std::string &point) {
105+
impl_->ClearCallBack(point);
106+
}
107+
108+
void SyncPoint::ClearAllCallBacks() { impl_->ClearAllCallBacks(); }
109+
110+
void SyncPoint::EnableProcessing() { impl_->EnableProcessing(); }
111+
112+
void SyncPoint::DisableProcessing() { impl_->DisableProcessing(); }
113+
114+
void SyncPoint::ClearTrace() { impl_->ClearTrace(); }
115+
116+
void SyncPoint::Process(const std::string &point, void *cb_arg) {
117+
impl_->Process(point, cb_arg);
118+
}
119+
120+
void SyncPoint::Data::LoadDependency(
121+
const std::vector<SyncPointPair> &dependencies) {
122+
std::lock_guard<std::mutex> lock(mutex_);
123+
successors_.clear();
124+
predecessors_.clear();
125+
cleared_points_.clear();
126+
for (const auto &dependency : dependencies) {
127+
successors_[dependency.predecessor].push_back(dependency.successor);
128+
predecessors_[dependency.successor].push_back(dependency.predecessor);
129+
}
130+
cv_.notify_all();
131+
}
132+
133+
void SyncPoint::Data::LoadDependencyAndMarkers(
134+
const std::vector<SyncPointPair> &dependencies,
135+
const std::vector<SyncPointPair> &markers) {
136+
std::lock_guard<std::mutex> lock(mutex_);
137+
successors_.clear();
138+
predecessors_.clear();
139+
cleared_points_.clear();
140+
markers_.clear();
141+
marked_thread_id_.clear();
142+
for (const auto &dependency : dependencies) {
143+
successors_[dependency.predecessor].push_back(dependency.successor);
144+
predecessors_[dependency.successor].push_back(dependency.predecessor);
145+
}
146+
for (const auto &marker : markers) {
147+
successors_[marker.predecessor].push_back(marker.successor);
148+
predecessors_[marker.successor].push_back(marker.predecessor);
149+
markers_[marker.predecessor].push_back(marker.successor);
150+
}
151+
cv_.notify_all();
152+
}
153+
154+
bool SyncPoint::Data::PredecessorsAllCleared(const std::string &point) {
155+
for (const auto &pred : predecessors_[point]) {
156+
if (cleared_points_.count(pred) == 0) {
157+
return false;
158+
}
159+
}
160+
return true;
161+
}
162+
163+
void SyncPoint::Data::ClearCallBack(const std::string &point) {
164+
std::unique_lock<std::mutex> lock(mutex_);
165+
while (num_callbacks_running_ > 0) {
166+
cv_.wait(lock);
167+
}
168+
callbacks_.erase(point);
169+
}
170+
171+
void SyncPoint::Data::ClearAllCallBacks() {
172+
std::unique_lock<std::mutex> lock(mutex_);
173+
while (num_callbacks_running_ > 0) {
174+
cv_.wait(lock);
175+
}
176+
callbacks_.clear();
177+
}
178+
179+
void SyncPoint::Data::Process(const std::string &point, void *cb_arg) {
180+
if (!enabled_) {
181+
return;
182+
}
183+
184+
std::unique_lock<std::mutex> lock(mutex_);
185+
auto thread_id = std::this_thread::get_id();
186+
187+
auto marker_iter = markers_.find(point);
188+
if (marker_iter != markers_.end()) {
189+
for (auto &marked_point : marker_iter->second) {
190+
marked_thread_id_.emplace(marked_point, thread_id);
191+
}
192+
}
193+
194+
if (DisabledByMarker(point, thread_id)) {
195+
return;
196+
}
197+
198+
while (!PredecessorsAllCleared(point)) {
199+
cv_.wait(lock);
200+
if (DisabledByMarker(point, thread_id)) {
201+
return;
202+
}
203+
}
204+
205+
auto callback_pair = callbacks_.find(point);
206+
if (callback_pair != callbacks_.end()) {
207+
num_callbacks_running_++;
208+
mutex_.unlock();
209+
callback_pair->second(cb_arg);
210+
mutex_.lock();
211+
num_callbacks_running_--;
212+
}
213+
cleared_points_.insert(point);
214+
cv_.notify_all();
215+
}
216+
} // namespace braft
217+
#endif // NDEBUG

0 commit comments

Comments
 (0)