@@ -1081,7 +1081,7 @@ leader(
10811081 HeartbeatResponse1 = HeartbeatResponse0 #{FollowerId => erlang :monotonic_time (millisecond )},
10821082 State1 = State0 # raft_state {heartbeat_response_ts = HeartbeatResponse1 },
10831083
1084- case select_follower_replication_mode (FollowerMatchIndex , State1 ) of
1084+ case select_follower_replication_mode (FollowerMatchIndex , FollowerLastAppliedIndex , State1 ) of
10851085 bulk_logs -> request_bulk_logs_for_follower (Sender , FollowerMatchIndex , State1 );
10861086 _ -> cancel_bulk_logs_for_follower (Sender , State1 )
10871087 end ,
@@ -1121,7 +1121,7 @@ leader(
11211121 ? SERVER_LOG_DEBUG (State0 , " at commit index ~0p failed append to ~0p whose log now ends at ~0p ." ,
11221122 [CommitIndex , Sender , FollowerEndIndex ]),
11231123
1124- select_follower_replication_mode (FollowerEndIndex , State0 ) =:= snapshot andalso
1124+ select_follower_replication_mode (FollowerEndIndex , FollowerLastAppliedIndex , State0 ) =:= snapshot andalso
11251125 request_snapshot_for_follower (FollowerId , State0 ),
11261126 cancel_bulk_logs_for_follower (Sender , State0 ),
11271127
@@ -3463,28 +3463,34 @@ check_leader_liveness(
34633463% % to discern what the best subsequent replication mode would be for this follower.
34643464-spec select_follower_replication_mode (
34653465 FollowerLastIndex :: wa_raft_log :log_index (),
3466+ FollowerLastAppliedIndex :: wa_raft_log :log_index () | undefined ,
34663467 State :: # raft_state {}
34673468) -> snapshot | bulk_logs | logs .
34683469select_follower_replication_mode (
34693470 FollowerLastIndex ,
3471+ FollowerLastAppliedIndex ,
34703472 # raft_state {
34713473 application = App ,
34723474 log_view = View ,
34733475 last_applied = LastAppliedIndex
34743476 }
34753477) ->
3476- BulkLogThreshold = ? RAFT_CATCHUP_THRESHOLD (App ),
3478+ BulkLogThreshold = ? RAFT_CATCHUP_BULK_LOG_THRESHOLD (App ),
3479+ ApplyBacklogThreshold = ? RAFT_CATCHUP_APPLY_BACKLOG_THRESHOLD (App ),
34773480 LeaderFirstIndex = wa_raft_log :first_index (View ),
34783481 if
34793482 % Snapshot is required if the follower is stalled or we are missing
34803483 % the logs required for incremental replication.
3481- FollowerLastIndex =:= 0 -> snapshot ;
3482- LeaderFirstIndex > FollowerLastIndex -> snapshot ;
3484+ FollowerLastIndex =:= 0 -> snapshot ;
3485+ LeaderFirstIndex > FollowerLastIndex -> snapshot ;
3486+ % If follower apply backlog is really large send a snapshot.
3487+ FollowerLastAppliedIndex =/= undefined andalso
3488+ FollowerLastIndex - FollowerLastAppliedIndex > ApplyBacklogThreshold -> snapshot ;
34833489 % Past a certain threshold, we should try to use bulk log catchup
34843490 % to quickly bring the follower back up to date.
3485- LastAppliedIndex - FollowerLastIndex > BulkLogThreshold -> bulk_logs ;
3491+ LastAppliedIndex - FollowerLastIndex > BulkLogThreshold -> bulk_logs ;
34863492 % Otherwise, replicate normally.
3487- true -> logs
3493+ true -> logs
34883494 end .
34893495
34903496% % Try to start a snapshot transport to a follower if the snapshot transport
0 commit comments