Skip to content

Commit 28132a0

Browse files
hsun324facebook-github-bot
authored andcommitted
Track commit request lifetime across terms
Summary: Track ongoing commit requests within the RAFT server state itself using the log index of the log entry assigned to the commit request. This fixes an issue where replicas that lost leadership would cancel ongoing commit requests with `{error, not_leader}` immediately even if the commit could still be processed because the log entry was not subsequently changed. Reviewed By: jaher Differential Revision: D75901229 fbshipit-source-id: ce89d3e27bf90a8c588bf72204ea608a74f128c7
1 parent 177475f commit 28132a0

File tree

7 files changed

+210
-178
lines changed

7 files changed

+210
-178
lines changed

include/wa_raft.hrl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,6 @@
373373
% Queue options
374374
queue_name :: atom(),
375375
queue_counters :: atomics:atomics_ref(),
376-
queue_commits :: atom(),
377376
queue_reads :: atom(),
378377

379378
% Server options
@@ -463,10 +462,13 @@
463462
%% that are in queue to be appended and replicated after a short
464463
%% wait to see if multiple commits can be handled at once to
465464
%% reduce overhead
466-
pending = [] :: [wa_raft_acceptor:op()],
465+
pending = [] :: [{gen_server:from(), wa_raft_acceptor:op()}],
467466
%% [Leader] Whether or not a read has been accepted and is waiting for the
468467
%% leader to establish a new quorum to be handled.
469468
pending_read = false :: boolean(),
469+
%% [Leader] The queue of accepted commit requests that are waiting to be
470+
%% committed and applied for response to the client.
471+
queued = #{} :: #{wa_raft_log:log_index() => gen_server:from()},
470472
%% [Leader] The index of the next log entry to send in the next heartbeat
471473
%% to each peer
472474
next_indices = #{} :: #{node() => wa_raft_log:log_index()},

include/wa_raft_rpc.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474

7575
-define(RAFT_COMMAND(Type, Payload), {command, Type, Payload}).
7676

77-
-define(COMMIT_COMMAND(Op), ?RAFT_COMMAND(commit, Op)).
77+
-define(COMMIT_COMMAND(From, Op), ?RAFT_COMMAND(commit, {From, Op})).
7878
-define(READ_COMMAND(Op), ?RAFT_COMMAND(read, Op)).
7979

8080
-define(STATUS_COMMAND, ?RAFT_COMMAND(status, undefined)).

src/wa_raft_acceptor.erl

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@
8282
-type commit_async_request() :: {commit, From :: gen_server:from(), Op :: op()}.
8383
-type commit_error_type() ::
8484
not_leader |
85-
{duplicate_request, Key :: key()} |
8685
{commit_queue_full, Key :: key()} |
8786
{apply_queue_full, Key :: key()} |
8887
{notify_redirect, Peer :: node()} |
89-
commit_stalled.
88+
commit_stalled |
89+
cancelled.
9090
-type commit_error() :: {error, commit_error_type()}.
9191
-type commit_result() :: Result :: dynamic() | Error :: commit_error() | call_error().
9292

@@ -244,24 +244,19 @@ commit_impl(From, {Key, _} = Op, #state{name = Name, server = Server, queues = Q
244244
try
245245
?LOG_DEBUG("Acceptor[~0p] starts to handle commit of ~0P from ~0p.",
246246
[Name, Op, 30, From], #{domain => [whatsapp, wa_raft]}),
247-
case wa_raft_queue:commit(Queues, Key, From) of
248-
duplicate ->
249-
?LOG_WARNING("Acceptor[~0p] is rejecting commit request from ~0p because it has duplicate key ~0p.",
250-
[Name, From, Key], #{domain => [whatsapp, wa_raft]}),
251-
?RAFT_COUNT('raft.acceptor.error.duplicate_commit'),
252-
{error, {duplicate_request, Key}};
247+
case wa_raft_queue:commit_started(Queues) of
253248
commit_queue_full ->
254-
?LOG_WARNING("Acceptor[~0p] is rejecting commit request from ~0p with key ~0p because the commit queue is full.",
255-
[Name, From, Key], #{domain => [whatsapp, wa_raft]}),
249+
?LOG_WARNING("Acceptor[~0p] is rejecting commit request from ~0p because the commit queue is full.",
250+
[Name, From], #{domain => [whatsapp, wa_raft]}),
256251
?RAFT_COUNT('raft.acceptor.error.commit_queue_full'),
257252
{error, {commit_queue_full, Key}};
258253
apply_queue_full ->
259-
?LOG_WARNING("Acceptor[~0p] is rejecting commit request from ~0p with key ~0p because the apply queue is full.",
260-
[Name, From, Key], #{domain => [whatsapp, wa_raft]}),
254+
?LOG_WARNING("Acceptor[~0p] is rejecting commit request from ~0p because the apply queue is full.",
255+
[Name, From], #{domain => [whatsapp, wa_raft]}),
261256
?RAFT_COUNT('raft.acceptor.error.apply_queue_full'),
262257
{error, {apply_queue_full, Key}};
263258
ok ->
264-
wa_raft_server:commit(Server, Op),
259+
wa_raft_server:commit(Server, From, Op),
265260
continue
266261
end
267262
after

src/wa_raft_part_sup.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ normalize_spec(Application, #{table := Table, partition := Partition} = Spec) ->
155155
log_catchup_name = wa_raft_log_catchup:default_name(Table, Partition),
156156
queue_name = wa_raft_queue:default_name(Table, Partition),
157157
queue_counters = wa_raft_queue:default_counters(),
158-
queue_commits = wa_raft_queue:default_commit_queue_name(Table, Partition),
159158
queue_reads = wa_raft_queue:default_read_queue_name(Table, Partition),
160159
server_name = ServerName,
161160
storage_name = StorageName,

src/wa_raft_queue.erl

Lines changed: 21 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,15 @@
2929
-export([
3030
default_name/2,
3131
default_counters/0,
32-
default_commit_queue_name/2,
3332
default_read_queue_name/2,
3433
registered_name/2
3534
]).
3635

3736
%% PENDING COMMIT QUEUE API
3837
-export([
39-
commit/3,
40-
fulfill_commit/3,
41-
fulfill_incomplete_commit/3,
42-
fulfill_all_commits/2
38+
commit_started/1,
39+
commit_cancelled/3,
40+
commit_completed/3
4341
]).
4442

4543
%% PENDING READ API
@@ -108,7 +106,6 @@
108106
-record(queues, {
109107
application :: atom(),
110108
counters :: atomics:atomics_ref(),
111-
commits :: atom(),
112109
reads :: atom()
113110
}).
114111
-opaque queues() :: #queues{}.
@@ -122,7 +119,6 @@ queues(Options) ->
122119
#queues{
123120
application = Options#raft_options.application,
124121
counters = Options#raft_options.queue_counters,
125-
commits = Options#raft_options.queue_commits,
126122
reads = Options#raft_options.queue_reads
127123
}.
128124

@@ -204,12 +200,6 @@ default_name(Table, Partition) ->
204200
default_counters() ->
205201
atomics:new(?RAFT_NUMBER_OF_QUEUE_SIZE_COUNTERS, []).
206202

207-
%% Get the default name for the RAFT commit queue ETS table associated with the
208-
%% provided RAFT partition.
209-
-spec default_commit_queue_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
210-
default_commit_queue_name(Table, Partition) ->
211-
binary_to_atom(<<"raft_commit_queue_", (atom_to_binary(Table))/bytes, "_", (integer_to_binary(Partition))/bytes>>).
212-
213203
%% Get the default name for the RAFT read queue ETS table associated with the
214204
%% provided RAFT partition.
215205
-spec default_read_queue_name(Table :: wa_raft:table(), Partition :: wa_raft:partition()) -> Name :: atom().
@@ -229,9 +219,8 @@ registered_name(Table, Partition) ->
229219
%% PENDING COMMIT QUEUE API
230220
%%-------------------------------------------------------------------
231221

232-
-spec commit(Queues :: queues(), Key :: wa_raft_acceptor:key(), From :: gen_server:from()) ->
233-
ok | apply_queue_full | commit_queue_full | duplicate.
234-
commit(#queues{counters = Counters, commits = Commits} = Queues, Reference, From) ->
222+
-spec commit_started(Queues :: queues()) -> ok | apply_queue_full | commit_queue_full.
223+
commit_started(#queues{counters = Counters} = Queues) ->
235224
case commit_queue_full(Queues) of
236225
true ->
237226
commit_queue_full;
@@ -240,47 +229,24 @@ commit(#queues{counters = Counters, commits = Commits} = Queues, Reference, From
240229
true ->
241230
apply_queue_full;
242231
false ->
243-
case ets:insert_new(Commits, {Reference, From}) of
244-
true ->
245-
PendingCommits = atomics:add_get(Counters, ?RAFT_COMMIT_QUEUE_SIZE_COUNTER, 1),
246-
?RAFT_GATHER('raft.acceptor.commit.request.pending', PendingCommits),
247-
ok;
248-
false ->
249-
duplicate
250-
end
232+
PendingCommits = atomics:add_get(Counters, ?RAFT_COMMIT_QUEUE_SIZE_COUNTER, 1),
233+
?RAFT_GATHER('raft.acceptor.commit.request.pending', PendingCommits),
234+
ok
251235
end
252236
end.
253237

254-
% Fulfill a pending commit with the result of the application of the command contained
255-
% within the commit.
256-
-spec fulfill_commit(Queues :: queues(), term(), dynamic()) -> ok | not_found.
257-
fulfill_commit(#queues{counters = Counters, commits = Commits}, Reference, Reply) ->
258-
case ets:take(Commits, Reference) of
259-
[{Reference, From}] ->
260-
atomics:sub(Counters, ?RAFT_COMMIT_QUEUE_SIZE_COUNTER, 1),
261-
gen_server:reply(From, Reply);
262-
[] ->
263-
not_found
264-
end.
265238

266-
% Fulfill a pending commit with an error that indicates that the commit was not completed.
267-
-spec fulfill_incomplete_commit(Queues :: queues(), term(), wa_raft_acceptor:commit_error()) -> ok | not_found.
268-
fulfill_incomplete_commit(Queues, Reference, Error) ->
269-
fulfill_commit(Queues, Reference, Error).
239+
-spec commit_cancelled(Queues :: queues(), From :: gen_server:from(), Reason :: wa_raft_acceptor:commit_error() | undefined) -> ok.
240+
commit_cancelled(#queues{counters = Counters}, From, Reason) ->
241+
atomics:sub(Counters, ?RAFT_COMMIT_QUEUE_SIZE_COUNTER, 1),
242+
Reason =/= undefined andalso gen_server:reply(From, Reason),
243+
ok.
270244

271-
% Fulfill a pending commit with an error that indicates that the commit was not completed.
272-
-spec fulfill_all_commits(Queues :: queues(), wa_raft_acceptor:commit_error()) -> ok.
273-
fulfill_all_commits(#queues{counters = Counters, commits = Commits}, Reply) ->
274-
lists:foreach(
275-
fun ({Reference, _}) ->
276-
case ets:take(Commits, Reference) of
277-
[{Reference, From}] ->
278-
atomics:sub(Counters, ?RAFT_COMMIT_QUEUE_SIZE_COUNTER, 1),
279-
gen_server:reply(From, Reply);
280-
[] ->
281-
ok
282-
end
283-
end, ets:tab2list(Commits)).
245+
-spec commit_completed(Queues :: queues(), From :: gen_server:from(), Reply :: term()) -> ok.
246+
commit_completed(#queues{counters = Counters}, From, Reply) ->
247+
atomics:sub(Counters, ?RAFT_COMMIT_QUEUE_SIZE_COUNTER, 1),
248+
gen_server:reply(From, Reply),
249+
ok.
284250

285251
%%-------------------------------------------------------------------
286252
%% PENDING READ QUEUE API
@@ -398,23 +364,21 @@ init(
398364
partition = Partition,
399365
queue_name = Name,
400366
queue_counters = Counters,
401-
queue_commits = CommitsName,
402367
queue_reads = ReadsName
403368
}
404369
) ->
405370
process_flag(trap_exit, true),
406371

407-
?LOG_NOTICE("Queue[~p] starting for partition ~0p/~0p with read queue ~0p and commit queue ~0p",
408-
[Name, Table, Partition, ReadsName, CommitsName], #{domain => [whatsapp, wa_raft]}),
372+
?LOG_NOTICE("Queue[~p] starting for partition ~0p/~0p with read queue ~0p",
373+
[Name, Table, Partition, ReadsName], #{domain => [whatsapp, wa_raft]}),
409374

410375
% The queue process is the first process in the supervision for a single
411376
% RAFT partition. The supervisor is configured to restart all processes if
412377
% even a single process fails. Since the queue process is starting up, all
413378
% queues tracked should be empty so reset all counters.
414379
[atomics:put(Counters, Index, 0) || Index <- lists:seq(1, ?RAFT_NUMBER_OF_QUEUE_SIZE_COUNTERS)],
415380

416-
% Create ETS tables for pending commits and reads.
417-
CommitsName = ets:new(CommitsName, [set | ?RAFT_QUEUE_TABLE_OPTIONS]),
381+
% Create ETS table for pending reads.
418382
ReadsName = ets:new(ReadsName, [ordered_set | ?RAFT_QUEUE_TABLE_OPTIONS]),
419383

420384
{ok, #state{name = Name}}.

0 commit comments

Comments
 (0)