Skip to content

Commit 4838b41

Browse files
hsun324facebook-github-bot
authored andcommitted
Move partial append truncation logic to server
Summary: Simplify log append logic by moving handling of truncation (which was previously duplicated) to the RAFT server. This also fixes a potential issue where a heartbeat with partially matching log terms can cause the RAFT log implementation to attempt to truncate log entries past the RAFT server local commit index which the RAFT log is not aware of. Reviewed By: jaher Differential Revision: D75886646 fbshipit-source-id: 45edeb6e8be24d604640aed3ecf551385c51b252
1 parent 79a7026 commit 4838b41

File tree

2 files changed

+104
-100
lines changed

2 files changed

+104
-100
lines changed

src/wa_raft_log.erl

Lines changed: 42 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
-export([
2121
append/2,
2222
append/3,
23-
append_at/3
23+
check_heartbeat/3
2424
]).
2525

2626
%% APIs for accessing log data
@@ -249,7 +249,9 @@
249249

250250
%% Truncate the RAFT log to the given position so that all log entries
251251
%% including and after the provided index are completely deleted from
252-
%% the RAFT log.
252+
%% the RAFT log. If the truncation failed but the log state was not
253+
%% changed, then an error can be returned. Otherwise, a error should
254+
%% be raised.
253255
-callback truncate(Log :: log(), Index :: log_index(), State :: term()) -> {ok, NewState :: term()} | error().
254256

255257
%% Optionally, trim the RAFT log up to the given index.
@@ -301,77 +303,66 @@ append(View, Entries) ->
301303
-spec append(View :: view(), Entries :: [log_entry()], Mode :: strict | relaxed) ->
302304
{ok, NewView :: view()} | skipped | wa_raft:error().
303305
append(#log_view{last = Last} = View, Entries, Mode) ->
306+
?RAFT_COUNT('raft.log.append'),
304307
Provider = provider(View),
305308
case Provider:append(View, Entries, Mode) of
306309
ok ->
310+
?RAFT_COUNT('raft.log.append.ok'),
307311
{ok, refresh_config(View#log_view{last = Last + length(Entries)})};
308312
skipped when Mode =:= relaxed ->
313+
?RAFT_COUNT('raft.log.append.skipped'),
309314
skipped;
310315
{error, Reason} ->
316+
?RAFT_COUNT('raft.log.append.error'),
311317
{error, Reason}
312318
end.
313319

314-
%% Append the provided log entries to the log at the specified starting position.
315-
%% If this provided starting position is past the end of the log and appending
316-
%% would produce a gap, then fail. If the provided start position is located
317-
%% in the middle of the log, then the terms of the provided log entries will be
318-
%% compared with the already existing log entries. If there is any mismatch, then
319-
%% all log entries after the mismatching log will be replaced with the new log
320-
%% entries provided.
321-
-spec append_at(View :: view(), Start :: log_index(), Entries :: [log_entry()]) ->
322-
{ok, MatchIndex :: log_index(), NewView :: view()} | wa_raft:error().
323-
append_at(#log_view{last = Last}, Start, _Entries) when Start =< 0; Start > Last + 1 ->
324-
{error, invalid_start_index};
325-
append_at(#log_view{log = Log, last = Last} = View0, Start, Entries) ->
326-
?RAFT_COUNT('raft.log.append'),
320+
%% Compare the provided heartbeat log entries to the local log at the provided
321+
%% starting position in preparation for an append operation:
322+
%% * If the provided starting position is before the start of the log or past
323+
%% the end of the log, the comparison will fail with an `out_of_range`
324+
%% error.
325+
%% * If there is a conflict between the provided heartbeat log entries and any
326+
%% local log entries due to a term mismatch, then the comparison will fail
327+
%% with a `conflict` tuple that contains the log index of the first log
328+
%% entry with a conflicting term and the list containing the corresponding
329+
%% heartbeat log entry and all subsequent heartbeat log entries.
330+
%% * Otherwise, the comparison will succeed. Any new log entries not already
331+
%% in the local log will be returned.
332+
-spec check_heartbeat(View :: view(), Start :: log_index(), Entries :: [log_entry()]) ->
333+
{ok, NewEntries :: [log_entry()]} |
334+
{conflict, ConflictIndex :: log_index(), NewEntries :: [log_entry()]} |
335+
{error, out_of_range | {missing, MissingIndex :: log_index()}} |
336+
wa_raft:error().
337+
check_heartbeat(#log_view{first = First, last = Last}, Start, _Entries) when Start =< 0; Start < First; Start > Last ->
338+
{error, out_of_range};
339+
check_heartbeat(#log_view{log = Log, last = Last}, Start, Entries) ->
327340
Provider = provider(Log),
328341
End = Start + length(Entries) - 1,
329-
try Provider:fold_terms(Log, Start, End, fun append_validate/3, {Start, Entries}) of
330-
{ok, {_, []}} ->
331-
{ok, End, View0};
342+
try Provider:fold_terms(Log, Start, End, fun check_heartbeat_terms/3, {Start, Entries}) of
343+
% The fold should not terminate early if the provider is well-behaved.
344+
{ok, {Next, []}} when Next =:= End + 1 ->
345+
{ok, []};
332346
{ok, {Next, NewEntries}} when Next =:= Last + 1 ->
333-
case Provider:append(View0, NewEntries, strict) of
334-
ok ->
335-
?RAFT_COUNT('raft.log.append.ok'),
336-
{ok, End, refresh_config(View0#log_view{last = max(Last, End)})};
337-
{error, Reason} ->
338-
?RAFT_COUNT('raft.log.append.error'),
339-
{error, Reason}
340-
end;
347+
{ok, NewEntries};
341348
{error, Reason} ->
342-
?RAFT_COUNT('raft.log.append.fold.error'),
349+
?RAFT_COUNT('raft.log.heartbeat.error'),
343350
{error, Reason}
344351
catch
345-
throw:{mismatch, Next, NewEntries} when Next >= Start ->
346-
?RAFT_COUNT('raft.log.append.mismatch'),
347-
case truncate(View0, Next) of
348-
{ok, View1} ->
349-
case Provider:append(View1, NewEntries, strict) of
350-
ok ->
351-
?RAFT_COUNT('raft.log.append.ok'),
352-
{ok, End, refresh_config(View1#log_view{last = End})};
353-
{error, Reason} ->
354-
?RAFT_COUNT('raft.log.append.error'),
355-
{error, Reason}
356-
end;
357-
{error, Reason} ->
358-
?RAFT_COUNT('raft.log.append.truncate.error'),
359-
{error, Reason}
360-
end;
352+
throw:{conflict, ConflictIndex, ConflictEntries} ->
353+
{conflict, ConflictIndex, ConflictEntries};
361354
throw:{missing, Index} ->
362-
?RAFT_COUNT('raft.log.append.corruption'),
355+
?RAFT_COUNT('raft.log.heartbeat.corruption'),
363356
{error, {missing, Index}}
364357
end.
365358

366-
-spec append_validate(Index :: wa_raft_log:log_index(), Term :: wa_raft_log:log_term(), Acc) -> Acc
359+
-spec check_heartbeat_terms(Index :: wa_raft_log:log_index(), Term :: wa_raft_log:log_term(), Acc) -> Acc
367360
when Acc :: {Next :: wa_raft_log:log_index(), Entries :: [log_entry()]}.
368-
append_validate(Index, Term, {Index, [{Term, _} | Entries]}) ->
361+
check_heartbeat_terms(Index, Term, {Index, [{Term, _} | Entries]}) ->
369362
{Index + 1, Entries};
370-
append_validate(Index, _, {Index, [_ | _] = Entries}) ->
371-
throw({mismatch, Index, Entries});
372-
append_validate(StoredIndex, StoredTerm, {Index, [_ | Entries]}) when Index < StoredIndex ->
373-
append_validate(StoredIndex, StoredTerm, {Index + 1, Entries});
374-
append_validate(_, _, {Index, [_ | _]}) ->
363+
check_heartbeat_terms(Index, _, {Index, [_ | _] = Entries}) ->
364+
throw({conflict, Index, Entries});
365+
check_heartbeat_terms(_, _, {Index, [_ | _]}) ->
375366
throw({missing, Index}).
376367

377368
%%-------------------------------------------------------------------

src/wa_raft_server.erl

Lines changed: 62 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2646,11 +2646,9 @@ commit_pending(#raft_state{log_view = View} = Data0) ->
26462646
skipped ->
26472647
% Since the append failed, we roll back the label state to before
26482648
% we constructed the entries.
2649-
?RAFT_COUNT('raft.server.sync.skipped'),
26502649
?RAFT_LOG_WARNING(leader, Data0, "skipped pre-heartbeat sync for ~0p log entr(ies).", [length(Entries)]),
26512650
cancel_pending({error, commit_stalled}, Data0);
26522651
{error, Error} ->
2653-
?RAFT_COUNT('raft.server.sync.error'),
26542652
?RAFT_LOG_ERROR(leader, Data0, "sync failed due to ~0P.", [Error, 20]),
26552653
error(Error)
26562654
end.
@@ -2679,7 +2677,6 @@ collect_pending(#raft_state{pending = Pending} = Data) ->
26792677
cancel_pending(_, #raft_state{pending = []} = Data) ->
26802678
Data;
26812679
cancel_pending(Reason, #raft_state{queues = Queues, pending = Pending} = Data) ->
2682-
?RAFT_COUNT('raft.commit.batch.cancel'),
26832680
% Pending commits are kept in reverse order.
26842681
[wa_raft_queue:fulfill_incomplete_commit(Queues, Reference, Reason) || {Reference, _} <- lists:reverse(Pending)],
26852682
Data#raft_state{pending = []}.
@@ -2966,60 +2963,76 @@ append_entries(
29662963
Entries,
29672964
EntryCount,
29682965
#raft_state{
2969-
log_view = View,
2966+
log_view = View0,
29702967
commit_index = CommitIndex,
29712968
current_term = CurrentTerm,
29722969
leader_id = LeaderId
29732970
} = Data
29742971
) ->
2975-
% Inspect the locally stored term associated with the previous log entry to discern if
2976-
% appending the provided range of log entries is allowed.
2977-
case wa_raft_log:term(View, PrevLogIndex) of
2978-
{ok, PrevLogTerm} ->
2979-
% If the term of the log entry previous the entries to be applied matches the term stored
2980-
% with the previous log entry in the local RAFT log, then this follower can proceed with
2981-
% appending to the log.
2982-
{ok, NewMatchIndex, NewView} = wa_raft_log:append_at(View, PrevLogIndex + 1, Entries),
2983-
{ok, true, NewMatchIndex, Data#raft_state{log_view = NewView}};
2984-
{ok, LocalPrevLogTerm} ->
2985-
% If the term of the log entry proceeding the entries to be applied does not match the log
2986-
% entry stored with the previous log entry in the local RAFT log, then we need to truncate
2987-
% the log because there is a mismatch between this follower and the leader of the cluster.
2988-
?RAFT_COUNT({raft, State, 'heartbeat.skip.log_term_mismatch'}),
2989-
?RAFT_LOG_WARNING(State, Data, "rejects appending ~0p log entries in range ~0p to ~0p as previous log entry ~0p has term ~0p locally when leader ~0p expects it to have term ~0p.",
2990-
[EntryCount, PrevLogIndex + 1, PrevLogIndex + EntryCount, PrevLogIndex, LocalPrevLogTerm, LeaderId, PrevLogTerm]),
2991-
case PrevLogIndex =< CommitIndex of
2992-
true ->
2993-
% We cannot validly delete log entries that have already been committed because doing
2994-
% so means that we are erasing log entries that may be part of the minimum quorum. If
2995-
% we try to do so, then disable this partition as we've violated a critical invariant.
2996-
?RAFT_COUNT({raft, State, 'heartbeat.error.corruption.excessive_truncation'}),
2997-
?RAFT_LOG_WARNING(State, Data, "fails as progress requires truncation of log entry at ~0p due to log mismatch when log entries up to ~0p were already committed.",
2998-
[PrevLogIndex, CommitIndex]),
2999-
{fatal,
3000-
lists:flatten(
3001-
io_lib:format("Leader ~0p of term ~0p requested truncation of log entry at ~0p due to log term mismatch (local ~0p, leader ~0p) when log entries up to ~0p were already committed.",
3002-
[LeaderId, CurrentTerm, PrevLogIndex, LocalPrevLogTerm, PrevLogTerm, CommitIndex]))};
3003-
false ->
3004-
% We are not deleting already applied log entries, so proceed with truncation.
3005-
?RAFT_LOG_NOTICE(State, Data, "Server[~0p, term ~0p, ~0p] truncating local log ending at ~0p to past ~0p due to log mismatch.",
3006-
[wa_raft_log:last_index(View), PrevLogIndex]),
3007-
{ok, NewView} = wa_raft_log:truncate(View, PrevLogIndex),
3008-
{ok, false, wa_raft_log:last_index(NewView), Data#raft_state{log_view = NewView}}
2972+
% Compare the incoming heartbeat with the local log to determine what
2973+
% actions need to be taken as part of handling this heartbeat.
2974+
case wa_raft_log:check_heartbeat(View0, PrevLogIndex, [{PrevLogTerm, undefined} | Entries]) of
2975+
{ok, []} ->
2976+
% No append is required as all the log entries in the heartbeat
2977+
% are already in the local log.
2978+
{ok, true, PrevLogIndex + EntryCount, Data};
2979+
{ok, NewEntries} ->
2980+
% No conflicting log entries were found in the heartbeat, but the
2981+
% heartbeat does contain new log entries to be appended to the end
2982+
% of the log.
2983+
{ok, View1} = wa_raft_log:append(View0, NewEntries),
2984+
{ok, true, PrevLogIndex + EntryCount, Data#raft_state{log_view = View1}};
2985+
{conflict, ConflictIndex, [{ConflictTerm, _} | _]} when ConflictIndex =< CommitIndex ->
2986+
% A conflict is detected that would result in the truncation of a
2987+
% log entry that the local replica has committed. We cannot validly
2988+
% delete log entries that are already committed because doing so
2989+
% may potenially cause the log entry to be no longer present on a
2990+
% majority of replicas.
2991+
{ok, LocalTerm} = wa_raft_log:term(View0, ConflictIndex),
2992+
?RAFT_COUNT({raft, State, 'heartbeat.error.corruption.excessive_truncation'}),
2993+
?RAFT_LOG_WARNING(State, Data, "refuses heartbeat at ~0p to ~0p that requires truncation past ~0p (term ~0p vs ~0p) when log entries up to ~0p are already committed.",
2994+
[PrevLogIndex, PrevLogIndex + EntryCount, ConflictIndex, ConflictTerm, LocalTerm, CommitIndex]),
2995+
Fatal = io_lib:format("A heartbeat at ~0p to ~0p from ~0p in term ~0p required truncating past ~0p (term ~0p vs ~0p) when log entries up to ~0p were already committed.",
2996+
[PrevLogIndex, PrevLogIndex + EntryCount, LeaderId, CurrentTerm, ConflictIndex, ConflictTerm, LocalTerm, CommitIndex]),
2997+
{fatal, lists:flatten(Fatal)};
2998+
{conflict, ConflictIndex, NewEntries} when ConflictIndex >= PrevLogIndex ->
2999+
% A truncation is required as there is a conflict between the local
3000+
% log and the incoming heartbeat.
3001+
?RAFT_LOG_NOTICE(State, Data, "handling heartbeat at ~0p by truncating local log ending at ~0p to past ~0p.",
3002+
[PrevLogIndex, wa_raft_log:last_index(View0), ConflictIndex]),
3003+
case wa_raft_log:truncate(View0, ConflictIndex) of
3004+
{ok, View1} ->
3005+
case ConflictIndex =:= PrevLogIndex of
3006+
true ->
3007+
% If the conflict precedes the heartbeat's log
3008+
% entries then no append can be performed.
3009+
{ok, false, wa_raft_log:last_index(View1), Data#raft_state{log_view = View1}};
3010+
false ->
3011+
% Otherwise, we can replace the truncated log
3012+
% entries with those from the current heartbeat.
3013+
{ok, View2} = wa_raft_log:append(View1, NewEntries),
3014+
{ok, true, PrevLogIndex + EntryCount, Data#raft_state{log_view = View2}}
3015+
end;
3016+
{error, Reason} ->
3017+
?RAFT_COUNT({raft, State, 'heartbeat.truncate.error'}),
3018+
?RAFT_LOG_WARNING(State, Data, "fails to truncate past ~0p while handling heartbeat at ~0p to ~0p due to ~0P",
3019+
[ConflictIndex, PrevLogIndex, PrevLogIndex + EntryCount, Reason, 30]),
3020+
{ok, false, wa_raft_log:last_index(View0), Data}
30093021
end;
3010-
not_found ->
3011-
% If the log entry is not found, then ignore and notify the leader of what log entry
3012-
% is required by this follower in the reply.
3013-
?RAFT_COUNT({raft, State, 'heartbeat.skip.missing_previous_log_entry'}),
3022+
{error, out_of_range} ->
3023+
% If the heartbeat is out of range (generally past the end of the
3024+
% log) then ignore and notify the leader of what log entry is
3025+
% required by this replica.
3026+
?RAFT_COUNT({raft, State, 'heartbeat.skip.out_of_range'}),
30143027
EntryCount =/= 0 andalso
3015-
?RAFT_LOG_WARNING(State, Data, "skips appending ~0p log entries in range ~0p to ~0p because previous log entry at ~0p is not available in local log covering ~0p to ~0p.",
3016-
[EntryCount, PrevLogIndex + 1, PrevLogIndex + EntryCount, PrevLogIndex, wa_raft_log:first_index(View), wa_raft_log:last_index(View)]),
3017-
{ok, false, wa_raft_log:last_index(View), Data};
3028+
?RAFT_LOG_WARNING(State, Data, "refuses out of range heartbeat at ~0p to ~0p with local log covering ~0p to ~0p.",
3029+
[PrevLogIndex, PrevLogIndex + EntryCount, wa_raft_log:first_index(View0), wa_raft_log:last_index(View0)]),
3030+
{ok, false, wa_raft_log:last_index(View0), Data};
30183031
{error, Reason} ->
3019-
?RAFT_COUNT({raft, State, 'heartbeat.skip.failed_to_read_previous_log_entry'}),
3020-
?RAFT_LOG_WARNING(State, Data, "skips appending ~0p log entries in range ~0p to ~0p because reading previous log entry at ~0p failed with error ~0P.",
3021-
[EntryCount, PrevLogIndex + 1, PrevLogIndex + EntryCount, PrevLogIndex, Reason, 30]),
3022-
{ok, false, wa_raft_log:last_index(View), Data}
3032+
?RAFT_COUNT({raft, State, 'heartbeat.skip.error'}),
3033+
?RAFT_LOG_WARNING(State, Data, "fails to check heartbeat at ~0p to ~0p for validity due to ~0P",
3034+
[PrevLogIndex, PrevLogIndex + EntryCount, Reason, 30]),
3035+
{ok, false, wa_raft_log:last_index(View0), Data}
30233036
end.
30243037

30253038
%%------------------------------------------------------------------------------

0 commit comments

Comments
 (0)