@@ -1032,10 +1032,8 @@ leader(
10321032) when NewTerm > CurrentTerm ->
10331033 ? RAFT_COUNT ('raft.leader.advance_term' ),
10341034 ? RAFT_LOG_NOTICE (State0 , " advancing to new term ~0p ." , [NewTerm ]),
1035- % % Drop any pending log entries in the current batch before advancing
1036- State1 = cancel_pending ({error , not_leader }, State0 ),
1037- State2 = advance_term (? FUNCTION_NAME , NewTerm , undefined , State1 ),
1038- {next_state , follower_or_witness_state (State2 ), State2 };
1035+ State1 = advance_term (? FUNCTION_NAME , NewTerm , undefined , State0 ),
1036+ {next_state , follower_or_witness_state (State1 ), State1 };
10391037
10401038% % [Protocol] Parse any RPCs in network formats
10411039leader (Type , Event , # raft_state {} = State ) when is_tuple (Event ), element (1 , Event ) =:= rpc ->
@@ -1178,60 +1176,47 @@ leader(state_timeout, _, #raft_state{application = App} = State0) ->
11781176 {keep_state , State1 , ? HEARTBEAT_TIMEOUT (State2 )};
11791177 false ->
11801178 ? RAFT_LOG_NOTICE (State0 , " resigns from leadership because this node is ineligible." , []),
1181- % % Drop any pending log entries queued in the log view before resigning
1182- State1 = cancel_pending ({error , not_leader }, State0 ),
1183- State2 = clear_leader (? FUNCTION_NAME , State1 ),
1184- {next_state , follower_or_witness_state (State2 ), State2 }
1179+ State1 = clear_leader (? FUNCTION_NAME , State0 ),
1180+ {next_state , follower_or_witness_state (State1 ), State1 }
11851181 end ;
11861182
1187- % % [Commit] If a handover is in progress, then try to redirect to handover target
1188- leader (
1189- cast ,
1190- ? COMMIT_COMMAND (From , _ ),
1191- # raft_state {
1192- queues = Queues ,
1193- handover = {Peer , _ , _ }
1194- } = State
1195- ) ->
1196- ? RAFT_COUNT ('raft.commit.handover' ),
1197- wa_raft_queue :commit_cancelled (Queues , From , {error , {notify_redirect , Peer }}), % Optimistically redirect to handover peer
1198- {keep_state , State };
1199-
1200- % % [Commit] Otherwise, add a new commit to the RAFT log
1183+ % % [Commit]
1184+ % % Add a new commit request to the pending list. If the cluster consists only
1185+ % % of the local node, then immediately apply the commit. Otherwise, if a
1186+ % % handover is not in progress, then immediately append the pending list if
1187+ % % enough pending commit requests have accumulated.
12011188leader (
12021189 cast ,
12031190 ? COMMIT_COMMAND (From , Op ),
12041191 # raft_state {
12051192 application = App ,
1206- pending = Pending
1193+ pending = Pending ,
1194+ handover = Handover
12071195 } = State0
12081196) ->
1197+ % No size limit is imposed here as the pending queue cannot grow larger
1198+ % than the limit on the number of pending commits.
12091199 ? RAFT_COUNT ('raft.commit' ),
1210- State1 = apply_single_node_cluster (State0 # raft_state {pending = [{From , Op } | Pending ]}), % apply immediately for single node cluster
1211- PendingCount = length (State1 # raft_state .pending ),
1212- case ? RAFT_COMMIT_BATCH_INTERVAL (App ) > 0 andalso PendingCount =< ? RAFT_COMMIT_BATCH_MAX_ENTRIES (App ) of
1213- true ->
1214- ? RAFT_COUNT ('raft.commit.batch.delay' ),
1215- {keep_state , State1 , ? COMMIT_BATCH_TIMEOUT (State1 )};
1216- false ->
1217- State2 = append_entries_to_followers (State1 ),
1218- {keep_state , State2 , ? HEARTBEAT_TIMEOUT (State2 )}
1200+ State1 = State0 # raft_state {pending = [{From , Op } | Pending ]},
1201+ case Handover of
1202+ undefined ->
1203+ State2 = apply_single_node_cluster (State1 ),
1204+ PendingCount = length (State2 # raft_state .pending ),
1205+ case ? RAFT_COMMIT_BATCH_INTERVAL (App ) > 0 andalso PendingCount =< ? RAFT_COMMIT_BATCH_MAX_ENTRIES (App ) of
1206+ true ->
1207+ ? RAFT_COUNT ('raft.commit.batch.delay' ),
1208+ {keep_state , State2 , ? COMMIT_BATCH_TIMEOUT (State2 )};
1209+ false ->
1210+ State3 = append_entries_to_followers (State2 ),
1211+ {keep_state , State3 , ? HEARTBEAT_TIMEOUT (State3 )}
1212+ end ;
1213+ _ ->
1214+ {keep_state , State1 }
12191215 end ;
12201216
1221- % % [Strong Read] If a handover is in progress, then try to redirect to handover target
1222- leader (
1223- cast ,
1224- ? READ_COMMAND ({From , _ }),
1225- # raft_state {
1226- queues = Queues ,
1227- handover = {Peer , _ , _ }
1228- } = State
1229- ) ->
1230- ? RAFT_COUNT ('raft.read.handover' ),
1231- wa_raft_queue :fulfill_incomplete_read (Queues , From , {error , {notify_redirect , Peer }}), % Optimistically redirect to handover peer
1232- {keep_state , State };
1233-
1234- % % [Strong Read] Leader is eligible to serve strong reads.
1217+ % % [Strong Read]
1218+ % % For an incoming read request, record the effective index for which it must
1219+ % % be executed after.
12351220leader (
12361221 cast ,
12371222 ? READ_COMMAND ({From , Command }),
@@ -1262,10 +1247,8 @@ leader(
12621247% % [Resign] Leader resigns by switching to follower state.
12631248leader ({call , From }, ? RESIGN_COMMAND , # raft_state {} = State0 ) ->
12641249 ? RAFT_LOG_NOTICE (State0 , " resigns." , []),
1265- % % Drop any pending log entries queued in the log view before resigning
1266- State1 = cancel_pending ({error , not_leader }, State0 ),
1267- State2 = clear_leader (? FUNCTION_NAME , State1 ),
1268- {next_state , follower_or_witness_state (State2 ), State2 , {reply , From , ok }};
1250+ State1 = clear_leader (? FUNCTION_NAME , State0 ),
1251+ {next_state , follower_or_witness_state (State1 ), State1 , {reply , From , ok }};
12691252
12701253% % [Adjust Membership] Leader attempts to commit a single-node membership change.
12711254leader (
@@ -2567,12 +2550,15 @@ set_leader(
25672550 # raft_state {
25682551 table = Table ,
25692552 partition = Partition ,
2553+ storage = Storage ,
25702554 current_term = CurrentTerm
25712555 } = Data
25722556) ->
25732557 ? RAFT_LOG_NOTICE (State , Data , " changes leader to ~0p ." , [Node ]),
25742558 wa_raft_info :set_current_term_and_leader (Table , Partition , CurrentTerm , Node ),
2575- Data # raft_state {leader_id = Node }.
2559+ NewData = Data # raft_state {leader_id = Node , pending_read = false },
2560+ wa_raft_storage :cancel (Storage ),
2561+ cancel_pending ({error , not_leader }, NewData ).
25762562
25772563-spec clear_leader (state (), # raft_state {}) -> # raft_state {}.
25782564clear_leader (_ , # raft_state {leader_id = undefined } = Data ) ->
@@ -2584,12 +2570,11 @@ clear_leader(State, #raft_state{table = Table, partition = Partition, current_te
25842570
25852571% % Setup the RAFT state upon entry into a new RAFT server state.
25862572-spec enter_state (State :: state (), Data :: # raft_state {}) -> # raft_state {}.
2587- enter_state (State , # raft_state {table = Table , partition = Partition , storage = Storage } = Data0 ) ->
2573+ enter_state (State , # raft_state {table = Table , partition = Partition } = Data0 ) ->
25882574 Now = erlang :monotonic_time (millisecond ),
25892575 Data1 = Data0 # raft_state {state_start_ts = Now },
25902576 true = wa_raft_info :set_state (Table , Partition , State ),
25912577 ok = check_stale_upon_entry (State , Now , Data1 ),
2592- ok = wa_raft_storage :cancel (Storage ),
25932578 Data1 .
25942579
25952580-spec check_stale_upon_entry (State :: state (), Now :: integer (), Data :: # raft_state {}) -> ok .
@@ -2648,8 +2633,6 @@ advance_term(
26482633 current_term = NewTerm ,
26492634 voted_for = VotedFor ,
26502635 votes = #{},
2651- pending = [],
2652- pending_read = false ,
26532636 next_indices = #{},
26542637 match_indices = #{},
26552638 last_applied_indices = #{},
0 commit comments