Skip to content

Commit 79327d8

Browse files
hsun324facebook-github-bot
authored andcommitted
Make configuration changes non-blocking
Summary: Change the membership change command implementation to use a relaxed append to the log to avoid it blocking the operation of the RAFT server for log implementations that may block. Also separate out the `refresh` configuration change into its own command. Reviewed By: jaher Differential Revision: D79467254 fbshipit-source-id: 9df857e66984e84717be2722db9bab8fa68d82ce
1 parent 2a657bf commit 79327d8

File tree

2 files changed

+163
-80
lines changed

2 files changed

+163
-80
lines changed

include/wa_raft_rpc.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
-define(RESIGN_COMMAND, ?RAFT_COMMAND(resign, undefined)).
8484

8585
-define(ADJUST_MEMBERSHIP_COMMAND(Action, Peer, ConfigIndex), ?RAFT_COMMAND(adjust_membership, {Action, Peer, ConfigIndex})).
86+
-define(REFRESH_CONFIG_COMMAND(), ?RAFT_COMMAND(refresh_config, undefined)).
8687

8788
-define(SNAPSHOT_AVAILABLE_COMMAND(Root, Position), ?RAFT_COMMAND(snapshot_available, {Root, Position})).
8889

src/wa_raft_server.erl

Lines changed: 162 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
snapshot_available/3,
9999
adjust_membership/3,
100100
adjust_membership/4,
101+
refresh_config/1,
101102
trigger_election/1,
102103
trigger_election/2,
103104
promote/2,
@@ -142,7 +143,7 @@
142143
compute_quorum/3,
143144
config/1,
144145
max_index_to_apply/3,
145-
adjust_config/3
146+
adjust_config_membership/4
146147
]).
147148
-endif.
148149

@@ -258,7 +259,7 @@
258259
-type rpc_named() :: ?RAFT_NAMED_RPC(atom(), wa_raft_log:log_term(), atom(), node(), undefined | tuple()).
259260

260261
-type command() :: commit_command() | read_command() | status_command() | trigger_election_command() |
261-
promote_command() | resign_command() | adjust_membership_command() |
262+
promote_command() | resign_command() | adjust_membership_command() | refresh_config_command() |
262263
snapshot_available_command() | handover_candidates_command() | handover_command() |
263264
enable_command() | disable_command() | bootstrap_command() | notify_complete_command().
264265

@@ -268,7 +269,8 @@
268269
-type trigger_election_command() :: ?TRIGGER_ELECTION_COMMAND(term_or_offset()).
269270
-type promote_command() :: ?PROMOTE_COMMAND(term_or_offset(), boolean()).
270271
-type resign_command() :: ?RESIGN_COMMAND.
271-
-type adjust_membership_command() :: ?ADJUST_MEMBERSHIP_COMMAND(membership_action(), peer() | undefined, wa_raft_log:log_index() | undefined).
272+
-type adjust_membership_command() :: ?ADJUST_MEMBERSHIP_COMMAND(membership_action(), peer(), wa_raft_log:log_index() | undefined).
273+
-type refresh_config_command() :: ?REFRESH_CONFIG_COMMAND().
272274
-type snapshot_available_command() :: ?SNAPSHOT_AVAILABLE_COMMAND(string(), wa_raft_log:log_pos()).
273275
-type handover_candidates_command() :: ?HANDOVER_CANDIDATES_COMMAND.
274276
-type handover_command() :: ?HANDOVER_COMMAND(node()).
@@ -283,7 +285,7 @@
283285

284286
-type timeout_type() :: election | heartbeat.
285287

286-
-type membership_action() :: add | add_witness | remove | remove_witness | refresh.
288+
-type membership_action() :: add | add_witness | remove | remove_witness.
287289

288290
%%------------------------------------------------------------------------------
289291
%% RAFT Server - OTP Supervision
@@ -520,13 +522,18 @@ adjust_membership(Server, Action, Peer) ->
520522

521523
-spec adjust_membership(
522524
Server :: gen_statem:server_ref(),
523-
Action :: add | remove | add_witness | remove_witness,
525+
Action :: membership_action(),
524526
Peer :: peer(),
525527
ConfigIndex :: wa_raft_log:log_index() | undefined
526528
) -> {ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
527529
adjust_membership(Server, Action, Peer, ConfigIndex) ->
528530
gen_statem:call(Server, ?ADJUST_MEMBERSHIP_COMMAND(Action, Peer, ConfigIndex), ?RAFT_RPC_CALL_TIMEOUT()).
529531

532+
-spec refresh_config(Server :: gen_statem:server_ref()) ->
533+
{ok, Position :: wa_raft_log:log_pos()} | {error, Reason :: term()}.
534+
refresh_config(Server) ->
535+
gen_statem:call(Server, ?REFRESH_CONFIG_COMMAND(), ?RAFT_RPC_CALL_TIMEOUT()).
536+
530537
%% Request the specified RAFT server to start an election in the next term.
531538
-spec trigger_election(Server :: gen_statem:server_ref()) -> ok | {error, Reason :: term()}.
532539
trigger_election(Server) ->
@@ -1252,73 +1259,99 @@ leader({call, From}, ?RESIGN_COMMAND, #raft_state{} = State0) ->
12521259
leader(
12531260
Type,
12541261
?ADJUST_MEMBERSHIP_COMMAND(Action, Peer, ExpectedConfigIndex),
1255-
#raft_state{
1256-
log_view = View0,
1257-
storage = Storage,
1258-
current_term = CurrentTerm,
1259-
commit_index = CommitIndex,
1260-
first_current_term_log_index = TermStartIndex
1261-
} = State0
1262+
#raft_state{cached_config = {CachedConfigIndex, Config}} = State0
12621263
) ->
1263-
% Try to adjust the configuration according to the current request.
1264-
Config = config(State0),
1265-
% eqwalizer:ignore Peer can be undefined
1266-
case adjust_config({Action, Peer}, Config, State0) of
1267-
{ok, NewConfig} ->
1268-
% Ensure that we have committed at least one log entry in the current
1269-
% term before admitting any membership change operations.
1270-
case CommitIndex >= TermStartIndex of
1264+
case config_change_allowed(State0) of
1265+
true ->
1266+
% At this point, we know that the cached config is the effective
1267+
% config as no uncommited config is allowed.
1268+
case ExpectedConfigIndex =/= undefined andalso ExpectedConfigIndex =/= CachedConfigIndex of
12711269
true ->
1272-
% Ensure that there is no as-of-yet uncomitted config.
1273-
StorageConfigIndex = case wa_raft_storage:config(Storage) of
1274-
{ok, #raft_log_pos{index = SI}, _} -> SI;
1275-
undefined -> 0
1276-
end,
1277-
LogConfigIndex = case wa_raft_log:config(View0) of
1278-
{ok, LI, _} -> LI;
1279-
not_found -> 0
1280-
end,
1281-
ConfigIndex = max(StorageConfigIndex, LogConfigIndex),
1282-
case ConfigIndex > CommitIndex of
1283-
true ->
1284-
?SERVER_LOG_NOTICE(State0, "at ~0p refusing to ~0p peer ~0p because it has a pending reconfiguration (storage: ~0p, log: ~0p).",
1285-
[CommitIndex, Action, Peer, StorageConfigIndex, LogConfigIndex]),
1286-
reply(Type, {error, rejected}),
1287-
{keep_state, State0};
1288-
false ->
1289-
case ExpectedConfigIndex =/= undefined andalso ExpectedConfigIndex =/= ConfigIndex of
1290-
true ->
1291-
?SERVER_LOG_NOTICE(State0, "refusing to ~0p peer ~0p because it has a different config index than expected (storage: ~0p, log: ~0p expected: ~0p).",
1292-
[Action, Peer, StorageConfigIndex, LogConfigIndex, ExpectedConfigIndex]),
1293-
reply(Type, {error, rejected}),
1294-
{keep_state, State0};
1295-
false ->
1296-
% Now that we have completed all the required checks, the leader is free to
1297-
% attempt to change the config. We heartbeat immediately to make the change as
1298-
% soon as possible.
1299-
Op = {make_ref(), {config, NewConfig}},
1300-
{LogEntry, State1} = make_log_entry(Op, State0),
1301-
{ok, View1} = wa_raft_log:append(View0, [LogEntry]),
1302-
LogIndex = wa_raft_log:last_index(View1),
1303-
?SERVER_LOG_NOTICE(State1, "appended configuration change from ~0p to ~0p at log index ~0p.", [Config, NewConfig, LogIndex]),
1304-
State2 = State1#raft_state{log_view = View1},
1305-
State3 = apply_single_node_cluster(State2),
1306-
State4 = append_entries_to_followers(State3),
1307-
reply(Type, {ok, #raft_log_pos{index = LogIndex, term = CurrentTerm}}),
1308-
{keep_state, State4, ?HEARTBEAT_TIMEOUT(State4)}
1309-
end
1310-
end;
1311-
false ->
1312-
?SERVER_LOG_NOTICE(State0, "refusing to ~0p peer ~0p because it has not established current term commit quorum.", [Action, Peer]),
1270+
?SERVER_LOG_NOTICE(
1271+
State0,
1272+
"refuses a membership change request because the request expects that the effective configuration is at ~0p when it is actually at ~0p",
1273+
[ExpectedConfigIndex, CachedConfigIndex]
1274+
),
13131275
reply(Type, {error, rejected}),
1276+
{keep_state, State0};
1277+
false ->
1278+
case adjust_config_membership(Action, Peer, Config, State0) of
1279+
{ok, NewConfig} ->
1280+
% With all checks completed, we can now attempt to append the new
1281+
% configuration to the log. If successful, a round of heartbeats is
1282+
% immediately started to replicate the change as soon as possible.
1283+
case change_config(NewConfig, State0) of
1284+
{ok, #raft_log_pos{index = NewConfigIndex} = NewConfigPosition, State1} ->
1285+
?SERVER_LOG_NOTICE(
1286+
State1,
1287+
"has appended a new configuration change from ~0p to ~0p at log index ~0p.",
1288+
[Config, NewConfig, NewConfigIndex]
1289+
),
1290+
State2 = apply_single_node_cluster(State1),
1291+
State3 = append_entries_to_followers(State2),
1292+
reply(Type, {ok, NewConfigPosition}),
1293+
{keep_state, State3, ?HEARTBEAT_TIMEOUT(State3)};
1294+
{error, Reason} ->
1295+
?SERVER_LOG_NOTICE(
1296+
State0,
1297+
"failed to append a new configuration due to ~0P.",
1298+
[Reason, 20]
1299+
),
1300+
reply(Type, {error, Reason}),
1301+
{keep_state, State0}
1302+
end;
1303+
{error, Reason} ->
1304+
?SERVER_LOG_NOTICE(
1305+
State0,
1306+
"cannot ~0p peer ~0p against configuration ~0p due to ~0P.",
1307+
[Action, Peer, Config, Reason, 20]
1308+
),
1309+
reply(Type, {error, Reason}),
1310+
{keep_state, State0}
1311+
end
1312+
end;
1313+
false ->
1314+
reply(Type, {error, rejected}),
1315+
{keep_state, State0}
1316+
end;
1317+
1318+
%% [Refresh Config] Leader attempts to refresh the current config in-place.
1319+
leader(
1320+
Type,
1321+
?REFRESH_CONFIG_COMMAND(),
1322+
#raft_state{cached_config = {_, Config}} = State0
1323+
) ->
1324+
case config_change_allowed(State0) of
1325+
true ->
1326+
% With all checks completed, we can now attempt to append the new
1327+
% configuration to the log. If successful, a round of heartbeats is
1328+
% immediately started to replicate the change as soon as possible.
1329+
case change_config(Config, State0) of
1330+
{ok, #raft_log_pos{index = NewConfigIndex} = NewConfigPosition, State1} ->
1331+
?SERVER_LOG_NOTICE(
1332+
State1,
1333+
"has refreshed the current configuration ~0p at log index ~0p.",
1334+
[Config, NewConfigIndex]
1335+
),
1336+
State2 = apply_single_node_cluster(State1),
1337+
State3 = append_entries_to_followers(State2),
1338+
reply(Type, {ok, NewConfigPosition}),
1339+
{keep_state, State3, ?HEARTBEAT_TIMEOUT(State3)};
1340+
{error, Reason} ->
1341+
?SERVER_LOG_NOTICE(
1342+
State0,
1343+
"failed to append a new configuration due to ~0P.",
1344+
[Reason, 20]
1345+
),
1346+
reply(Type, {error, Reason}),
13141347
{keep_state, State0}
13151348
end;
1316-
{error, Reason} ->
1317-
?SERVER_LOG_NOTICE(State0, "refusing to ~0p peer ~0p on configuration ~0p due to ~0p.", [Action, Peer, Config, Reason]),
1318-
reply(Type, {error, Reason}),
1349+
false ->
1350+
reply(Type, {error, rejected}),
13191351
{keep_state, State0}
13201352
end;
13211353

1354+
13221355
%% [Handover Candidates] Return list of handover candidates (peers that are not lagging too much)
13231356
leader({call, From}, ?HANDOVER_CANDIDATES_COMMAND, #raft_state{} = State) ->
13241357
{keep_state_and_data, {reply, From, {ok, compute_handover_candidates(State)}}};
@@ -2846,42 +2879,91 @@ is_eligible_for_handover(
28462879
LastAppliedIndex = maps:get(CandidateId, LastAppliedIndices, 0),
28472880
MatchIndex >= MatchCutoffIndex andalso LastAppliedIndex >= ApplyCutoffIndex.
28482881

2849-
-spec adjust_config(
2850-
Action :: {add, peer()} | {remove, peer()} | {add_witness, peer()} | {remove_witness, peer()} | {refresh, undefined},
2882+
%%------------------------------------------------------------------------------
2883+
%% RAFT Server - State Machine Implementation - Configuration Changes
2884+
%%------------------------------------------------------------------------------
2885+
2886+
-spec config_change_allowed(State :: #raft_state{}) -> boolean().
2887+
config_change_allowed(
2888+
#raft_state{
2889+
log_view = View,
2890+
commit_index = CommitIndex,
2891+
last_applied = LastApplied,
2892+
first_current_term_log_index = TermStartIndex
2893+
} = State
2894+
) ->
2895+
% A leader must establish quorum on at least one log entry in the current
2896+
% term because it is ready for a configuration change.
2897+
case CommitIndex >= TermStartIndex of
2898+
true ->
2899+
% No two configuration changes can be in the log at the same time
2900+
% and both be not yet committed.
2901+
case wa_raft_log:config(View) of
2902+
{ok, ConfigIndex, _} when ConfigIndex > LastApplied ->
2903+
?SERVER_LOG_NOTICE(
2904+
leader,
2905+
State,
2906+
"at ~0p is not ready for a new configuration because a new configuration is not yet committed at ~0p.",
2907+
[CommitIndex, ConfigIndex]
2908+
),
2909+
false;
2910+
_ ->
2911+
true
2912+
end;
2913+
false ->
2914+
?SERVER_LOG_NOTICE(
2915+
leader,
2916+
State,
2917+
"at ~0p is not ready for a new configuration because it has not established a quorum in the current term.",
2918+
[CommitIndex]
2919+
),
2920+
false
2921+
end.
2922+
2923+
-spec change_config(NewConfig :: wa_raft_server:config(), State :: #raft_state{}) ->
2924+
{ok, NewConfigPosition :: wa_raft_log:log_pos(), NewState :: #raft_state{}} | {error, Reason :: term()}.
2925+
change_config(NewConfig, #raft_state{log_view = View, current_term = CurrentTerm} = State0) ->
2926+
{LogEntry, State1} = make_log_entry({make_ref(), {config, NewConfig}}, State0),
2927+
case wa_raft_log:try_append(View, [LogEntry]) of
2928+
{ok, NewView} ->
2929+
NewConfigPosition = #raft_log_pos{index = wa_raft_log:last_index(NewView), term = CurrentTerm},
2930+
State2 = State1#raft_state{log_view = NewView},
2931+
{ok, NewConfigPosition, State2};
2932+
skipped ->
2933+
{error, commit_stalled};
2934+
{error, Reason} ->
2935+
{error, Reason}
2936+
end.
2937+
2938+
-spec adjust_config_membership(
2939+
Action :: membership_action(),
2940+
Peer :: peer(),
28512941
Config :: config(),
28522942
State :: #raft_state{}
28532943
) -> {ok, NewConfig :: config()} | {error, Reason :: atom()}.
2854-
adjust_config(Action, Config, #raft_state{self = Self}) ->
2944+
adjust_config_membership(Action, {Name, Node}, Config, #raft_state{self = Self}) ->
2945+
PeerIdentity = ?IDENTITY_REQUIRES_MIGRATION(Name, Node),
28552946
Membership = get_config_members(Config),
28562947
Witness = get_config_witnesses(Config),
28572948
case Action of
2858-
% The 'refresh' action is used to commit the current effective configuration to storage in the
2859-
% case of upgrading from adhoc to stored configuration or to materialize changes to the format
2860-
% of stored configurations.
2861-
{refresh, undefined} ->
2862-
{ok, Config};
2863-
{add, {Name, Node}} ->
2864-
PeerIdentity = ?IDENTITY_REQUIRES_MIGRATION(Name, Node),
2949+
add ->
28652950
case lists:member(PeerIdentity, Membership) of
28662951
true -> {error, already_member};
28672952
false -> {ok, set_config_members([PeerIdentity | Membership], Config)}
28682953
end;
2869-
{add_witness, {Name, Node}} ->
2870-
PeerIdentity = ?IDENTITY_REQUIRES_MIGRATION(Name, Node),
2954+
add_witness ->
28712955
case {lists:member(PeerIdentity, Witness), lists:member(PeerIdentity, Membership)} of
28722956
{true, true} -> {error, already_witness};
28732957
{true, _} -> {error, already_member};
28742958
{false, _} -> {ok, set_config_members([PeerIdentity | Membership], [PeerIdentity | Witness], Config)}
28752959
end;
2876-
{remove, {Name, Node}} ->
2877-
PeerIdentity = ?IDENTITY_REQUIRES_MIGRATION(Name, Node),
2960+
remove ->
28782961
case {PeerIdentity, lists:member(PeerIdentity, Membership)} of
28792962
{Self, _} -> {error, cannot_remove_self};
28802963
{_, false} -> {error, not_a_member};
28812964
{_, true} -> {ok, set_config_members(lists:delete(PeerIdentity, Membership), lists:delete(PeerIdentity, Witness), Config)}
28822965
end;
2883-
{remove_witness, {Name, Node}} ->
2884-
PeerIdentity = ?IDENTITY_REQUIRES_MIGRATION(Name, Node),
2966+
remove_witness ->
28852967
case {PeerIdentity, lists:member(PeerIdentity, Witness)} of
28862968
{Self, _} -> {error, cannot_remove_self};
28872969
{_, false} -> {error, not_a_witness};

0 commit comments

Comments
 (0)