4141
4242% % API
4343-export ([
44- start_catchup_request /5 ,
44+ start_catchup_request /6 ,
4545 cancel_catchup_request /2 ,
4646 is_catching_up /2
4747]).
8181
8282% % An entry in the catchup request ETS table representing a request to
8383% % trigger log catchup for a particular peer.
84- -define (CATCHUP_REQUEST (Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex ), {Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex }).
84+ -define (CATCHUP_REQUEST (Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex , Witness ), {Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex , Witness }).
8585
8686% % An entry in the catchup ETS table that indicates an in-progress log
8787% % catchup to the specified node.
@@ -116,9 +116,10 @@ start_link(#raft_options{log_catchup_name = Name} = Options) ->
116116
117117% % Submit a request to trigger log catchup for a particular follower starting at the index provided.
118118-spec start_catchup_request (Catchup :: atom (), Peer :: # raft_identity {}, FollowerLastIndex :: wa_raft_log :log_index (),
119- LeaderTerm :: wa_raft_log :log_term (), LeaderCommitIndex :: wa_raft_log :log_index ()) -> ok .
120- start_catchup_request (Catchup , Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex ) ->
121- ets :insert (Catchup , ? CATCHUP_REQUEST (Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex )),
119+ LeaderTerm :: wa_raft_log :log_term (), LeaderCommitIndex :: wa_raft_log :log_index (),
120+ Witness :: boolean ()) -> ok .
121+ start_catchup_request (Catchup , Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex , Witness ) ->
122+ ets :insert (Catchup , ? CATCHUP_REQUEST (Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex , Witness )),
122123 ok .
123124
124125% % Cancel a request to trigger log catchup for a particular follower.
@@ -206,8 +207,8 @@ handle_info(timeout, #state{name = Name} = State) ->
206207 {noreply , State , ? IDLE_TIMEOUT };
207208 Requests ->
208209 % Select a random log catchup request to process.
209- ? CATCHUP_REQUEST (Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex ) = lists :nth (rand :uniform (length (Requests )), Requests ),
210- NewState = send_logs (Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex , State ),
210+ ? CATCHUP_REQUEST (Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex , Witness ) = lists :nth (rand :uniform (length (Requests )), Requests ),
211+ NewState = send_logs (Peer , FollowerLastIndex , LeaderTerm , LeaderCommitIndex , Witness , State ),
211212 {noreply , NewState , ? CONTINUE_TIMEOUT }
212213 end ;
213214handle_info (Info , # state {name = Name } = State ) ->
@@ -222,8 +223,8 @@ terminate(_Reason, #state{name = Name}) ->
222223% % Private functions - Send logs to follower
223224% %
224225
225- -spec send_logs (# raft_identity {}, wa_raft_log :log_index (), wa_raft_log :log_term (), wa_raft_log :log_index (), # state {}) -> # state {}.
226- send_logs (Peer , NextLogIndex , LeaderTerm , LeaderCommitIndex , # state {name = Name , lockouts = Lockouts } = State ) ->
226+ -spec send_logs (# raft_identity {}, wa_raft_log :log_index (), wa_raft_log :log_term (), wa_raft_log :log_index (), boolean (), # state {}) -> # state {}.
227+ send_logs (Peer , NextLogIndex , LeaderTerm , LeaderCommitIndex , Witness , # state {name = Name , lockouts = Lockouts } = State ) ->
227228 StartMillis = erlang :system_time (millisecond ),
228229 LockoutMillis = maps :get (Peer , Lockouts , 0 ),
229230 NewState = case LockoutMillis =< StartMillis of
@@ -233,7 +234,7 @@ send_logs(Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, #state{name = Name,
233234 true ->
234235 counters :add (Counters , ? COUNTER_CONCURRENT_CATCHUP , 1 ),
235236 ets :insert (? MODULE , ? CATCHUP_RECORD (Name , Peer )),
236- try send_logs_impl (Peer , NextLogIndex , LeaderTerm , LeaderCommitIndex , State ) catch
237+ try send_logs_impl (Peer , NextLogIndex , LeaderTerm , LeaderCommitIndex , Witness , State ) catch
237238 T :E :S ->
238239 ? RAFT_COUNT ('raft.catchup.error' ),
239240 ? LOG_ERROR (" Catchup[~p , term ~p ] bulk logs transfer to ~0p failed with ~0p ~0p at ~p " ,
@@ -256,15 +257,22 @@ send_logs(Peer, NextLogIndex, LeaderTerm, LeaderCommitIndex, #state{name = Name,
256257 ets :delete (? MODULE , Name ),
257258 NewState .
258259
259- -spec send_logs_impl (# raft_identity {}, wa_raft_log :log_index (), wa_raft_log :log_term (), wa_raft_log :log_index (), # state {}) -> term ().
260- send_logs_impl (# raft_identity {node = PeerNode } = Peer , NextLogIndex , LeaderTerm , LeaderCommitIndex ,
260+ -spec send_logs_impl (# raft_identity {}, wa_raft_log :log_index (), wa_raft_log :log_term (), wa_raft_log :log_index (), boolean (), # state {}) -> term ().
261+ send_logs_impl (# raft_identity {node = PeerNode } = Peer , NextLogIndex , LeaderTerm , LeaderCommitIndex , Witness ,
261262 # state {application = App , name = Name , self = Self , identifier = Identifier , distribution_module = DistributionModule , server_name = Server , log = Log } = State ) ->
262263 PrevLogIndex = NextLogIndex - 1 ,
263264 {ok , PrevLogTerm } = wa_raft_log :term (Log , PrevLogIndex ),
264265
265266 LogBatchEntries = ? RAFT_CATCHUP_MAX_ENTRIES_PER_BATCH (App ),
266267 LogBatchBytes = ? RAFT_CATCHUP_MAX_BYTES_PER_BATCH (App ),
267- {ok , Entries } = wa_raft_log :entries (Log , NextLogIndex , LogBatchEntries , LogBatchBytes ),
268+ Entries = case Witness of
269+ true ->
270+ {ok , RawEntries } = wa_raft_log :get (Log , NextLogIndex , LogBatchEntries , LogBatchBytes ),
271+ wa_raft_server :stub_entries_for_witness (RawEntries );
272+ false ->
273+ {ok , RawEntries } = wa_raft_log :entries (Log , NextLogIndex , LogBatchEntries , LogBatchBytes ),
274+ RawEntries
275+ end ,
268276
269277 case Entries of
270278 [] ->
@@ -278,7 +286,7 @@ send_logs_impl(#raft_identity{node = PeerNode} = Peer, NextLogIndex, LeaderTerm,
278286
279287 try wa_raft_server :parse_rpc (Self , DistributionModule :call (Dest , Identifier , Command , Timeout )) of
280288 {LeaderTerm , _ , ? APPEND_ENTRIES_RESPONSE (PrevLogIndex , true , FollowerMatchIndex , _ )} ->
281- send_logs_impl (Peer , FollowerMatchIndex + 1 , LeaderTerm , LeaderCommitIndex , State );
289+ send_logs_impl (Peer , FollowerMatchIndex + 1 , LeaderTerm , LeaderCommitIndex , Witness , State );
282290 {LeaderTerm , _ , ? APPEND_ENTRIES_RESPONSE (PrevLogIndex , false , _FollowerLastIndex , _ )} ->
283291 exit (append_failed );
284292 {LeaderTerm , _ , Other } ->
0 commit comments