@@ -689,6 +689,7 @@ terminate(Reason, State, #raft_state{table = Table, partition = Partition} = Dat
689689 ? SERVER_LOG_NOTICE (State , Data , " terminating due to ~0P " , [Reason , 20 ]),
690690 wa_raft_durable_state :sync (Data ),
691691 wa_raft_info :delete_state (Table , Partition ),
692+ wa_raft_info :set_live (Table , Partition , false ),
692693 wa_raft_info :set_stale (Table , Partition , true ),
693694 ok .
694695
@@ -934,7 +935,7 @@ stalled(
934935 catch file :del_dir_r (Path )
935936 end ;
936937 false ->
937- ? SERVER_LOG_NOTICE (State0 , " at ~0p rejecting request to bootstrap with data." , [LastApplied , Index , Term ]),
938+ ? SERVER_LOG_NOTICE (State0 , " at ~0p rejecting request to bootstrap with data." , [LastApplied ]),
938939 {keep_state_and_data , {reply , From , {error , rejected }}}
939940 end ;
940941
@@ -1177,6 +1178,7 @@ leader(state_timeout = Type, Event, #raft_state{handover = {Peer, _, Timeout}} =
11771178 ? SERVER_LOG_NOTICE (State , " handover to ~0p times out." , [Peer ]),
11781179 {keep_state , State # raft_state {handover = undefined }, {next_event , Type , Event }};
11791180 false ->
1181+ check_leader_liveness (State ),
11801182 {keep_state_and_data , ? HEARTBEAT_TIMEOUT (State )}
11811183 end ;
11821184
@@ -1186,7 +1188,7 @@ leader(state_timeout, _, #raft_state{application = App} = State0) ->
11861188 true ->
11871189 State1 = append_entries_to_followers (State0 ),
11881190 State2 = apply_single_node_cluster (State1 ),
1189- check_leader_lagging (State2 ),
1191+ check_leader_liveness (State2 ),
11901192 {keep_state , State1 , ? HEARTBEAT_TIMEOUT (State2 )};
11911193 false ->
11921194 ? SERVER_LOG_NOTICE (State0 , " resigns from leadership because this node is ineligible." , []),
@@ -1679,7 +1681,7 @@ candidate(Type, Event, #raft_state{} = State) when is_tuple(Event), element(1, E
16791681
16801682% % [AppendEntries RPC] Switch to follower because current term now has a leader (5.2, 5.3)
16811683candidate (Type , ? REMOTE (Sender , ? APPEND_ENTRIES (_ , _ , _ , _ , _ )) = Event , # raft_state {} = State ) ->
1682- ? SERVER_LOG_NOTICE (State , " switching to follower after receiving heartbeat from ~0P ." , [Sender ]),
1684+ ? SERVER_LOG_NOTICE (State , " switching to follower after receiving heartbeat from ~0p ." , [Sender ]),
16831685 {next_state , follower_or_witness_state (State ), State , {next_event , Type , Event }};
16841686
16851687% % [RequestVote RPC] Candidates should ignore incoming vote requests as they always vote for themselves (5.2)
@@ -1850,7 +1852,7 @@ disabled(Type, Event, #raft_state{} = State) ->
18501852witness (enter , PreviousStateName , # raft_state {} = State ) ->
18511853 ? RAFT_COUNT ('raft.witness.enter' ),
18521854 ? SERVER_LOG_NOTICE (State , " becomes witness from state ~0p ." , [PreviousStateName ]),
1853- {keep_state , enter_state (? FUNCTION_NAME , State )};
1855+ {keep_state , enter_state (? FUNCTION_NAME , State ), ? ELECTION_TIMEOUT ( State ) };
18541856
18551857% % [Internal] Advance to newer term when requested
18561858witness (
@@ -1903,6 +1905,11 @@ witness(_, ?REMOTE(Candidate, ?REQUEST_VOTE(_, CandidateIndex, CandidateTerm)),
19031905witness (_ , ? REMOTE (_ , ? VOTE (_ )), # raft_state {}) ->
19041906 keep_state_and_data ;
19051907
1908+ % % [State Timeout] Check liveness, but do not restart state.
1909+ witness (state_timeout , _ , # raft_state {} = State ) ->
1910+ check_follower_liveness (? FUNCTION_NAME , State ),
1911+ {keep_state_and_data , ? ELECTION_TIMEOUT (State )};
1912+
19061913% % [Command] Defer to common handling for generic RAFT server commands
19071914witness (Type , ? RAFT_COMMAND (_ , _ ) = Event , # raft_state {} = State ) ->
19081915 command (? FUNCTION_NAME , Type , Event , State );
@@ -2620,44 +2627,28 @@ enter_state(State, #raft_state{table = Table, partition = Partition} = Data0) ->
26202627 Now = erlang :monotonic_time (millisecond ),
26212628 Data1 = Data0 # raft_state {state_start_ts = Now },
26222629 true = wa_raft_info :set_state (Table , Partition , State ),
2623- ok = check_stale_upon_entry (State , Now , Data1 ),
2630+ ok = check_stale_upon_entry (State , Data1 ),
26242631 Data1 .
26252632
2626- -spec check_stale_upon_entry (State :: state (), Now :: integer (), Data :: # raft_state {}) -> ok .
2627- % % Followers and candidates may be stale upon entry due to not receiving a timely heartbeat from an active leader.
2628- check_stale_upon_entry (
2629- State ,
2630- Now ,
2631- # raft_state {
2632- application = Application ,
2633- table = Table ,
2634- partition = Partition ,
2635- leader_heartbeat_ts = LeaderHeartbeatTs
2636- } = Data
2637- ) when State =:= follower ; State =:= candidate ->
2638- Stale = case LeaderHeartbeatTs of
2639- undefined ->
2640- ? SERVER_LOG_NOTICE (State , Data , " is stale upon entry due to having no prior leader heartbeat." , []),
2641- true ;
2642- _ ->
2643- Delay = Now - LeaderHeartbeatTs ,
2644- case Delay > ? RAFT_FOLLOWER_STALE_INTERVAL (Application ) of
2645- true ->
2646- ? SERVER_LOG_NOTICE (State , Data , " is stale upon entry because the last leader heartbeat was received ~0p ms ago." , [Delay ]),
2647- true ;
2648- false ->
2649- false
2650- end
2651- end ,
2652- true = wa_raft_info :set_stale (Table , Partition , Stale ),
2633+ -spec check_stale_upon_entry (State :: state (), Data :: # raft_state {}) -> ok .
2634+ % % Followers and candidates are live upon entry if they've received a timely heartbeat
2635+ % % and inherit their staleness from the previous state.
2636+ check_stale_upon_entry (State , Data ) when State =:= follower ; State =:= candidate ->
2637+ check_follower_liveness (State , Data );
2638+ % % Witnesses are live upon entry if they've received a timely heartbeat but are always stale.
2639+ check_stale_upon_entry (witness , # raft_state {table = Table , partition = Partition } = Data ) ->
2640+ check_follower_liveness (witness , Data ),
2641+ wa_raft_info :set_stale (Table , Partition , true ),
26532642 ok ;
2654- % % Leaders are never stale upon entry.
2655- check_stale_upon_entry (leader , _ , # raft_state {table = Table , partition = Partition }) ->
2656- true = wa_raft_info :set_stale (Table , Partition , false ),
2643+ % % Leaders are always live and never stale upon entry.
2644+ check_stale_upon_entry (leader , # raft_state {table = Table , partition = Partition }) ->
2645+ wa_raft_info :set_live (Table , Partition , true ),
2646+ wa_raft_info :set_stale (Table , Partition , false ),
26572647 ok ;
2658- % % Witness, stalled and disabled servers are always stale.
2659- check_stale_upon_entry (_ , _ , # raft_state {table = Table , partition = Partition }) ->
2660- true = wa_raft_info :set_stale (Table , Partition , true ),
2648+ % % Stalled and disabled servers are never live and always stale.
2649+ check_stale_upon_entry (_ , # raft_state {table = Table , partition = Partition }) ->
2650+ wa_raft_info :set_live (Table , Partition , false ),
2651+ wa_raft_info :set_stale (Table , Partition , true ),
26612652 ok .
26622653
26632654% % Set a new current term and voted-for peer and clear any state that is associated with the previous term.
@@ -3069,7 +3060,8 @@ handle_heartbeat(
30693060 _ ->
30703061 Data2
30713062 end ,
3072- check_follower_lagging (CommitIndex , Data3 ),
3063+ refresh_follower_liveness (State , Data3 ),
3064+ check_follower_lagging (State , CommitIndex , Data3 ),
30733065 case follower_or_witness_state (Data3 ) of
30743066 State ->
30753067 {keep_state , Data3 , ? ELECTION_TIMEOUT (Data3 )};
@@ -3345,9 +3337,55 @@ should_heartbeat(#raft_state{application = App, last_heartbeat_ts = LastHeartbea
33453337 Current = erlang :monotonic_time (millisecond ),
33463338 Current - Latest > ? RAFT_HEARTBEAT_INTERVAL (App ).
33473339
3348- % % Check follower/witness state due to log entry lag and change stale flag if needed
3349- -spec check_follower_lagging (LeaderCommit :: pos_integer (), State :: # raft_state {}) -> ok .
3340+ -spec refresh_follower_liveness (State :: state (), Data :: # raft_state {}) -> ok .
3341+ refresh_follower_liveness (State , # raft_state {table = Table , partition = Partition } = Data ) ->
3342+ case wa_raft_info :get_live (Table , Partition ) of
3343+ true ->
3344+ ok ;
3345+ false ->
3346+ ? SERVER_LOG_NOTICE (State , Data , " is live" , []),
3347+ wa_raft_info :set_live (Table , Partition , true )
3348+ end ,
3349+ ok .
3350+
3351+ % % Check the timestamp of a follower or witness's last received heartbeat to determine if
3352+ % % the replica is live.
3353+ -spec check_follower_liveness (State :: state (), Data :: # raft_state {}) -> ok .
3354+ check_follower_liveness (
3355+ State ,
3356+ # raft_state {
3357+ application = App ,
3358+ table = Table ,
3359+ partition = Partition ,
3360+ leader_heartbeat_ts = LeaderHeartbeatTs
3361+ } = Data
3362+ ) ->
3363+ NowTs = erlang :monotonic_time (millisecond ),
3364+ GracePeriod = ? RAFT_LIVENESS_GRACE_PERIOD_MS (App ),
3365+ Liveness = wa_raft_info :get_live (Table , Partition ),
3366+ Live = LeaderHeartbeatTs =/= undefined andalso LeaderHeartbeatTs + GracePeriod >= NowTs ,
3367+ case Live of
3368+ Liveness ->
3369+ ok ;
3370+ true ->
3371+ ? SERVER_LOG_NOTICE (State , Data , " is live" , []),
3372+ wa_raft_info :set_live (Table , Partition , true );
3373+ false ->
3374+ ? SERVER_LOG_NOTICE (State , Data , " is no longer live after last leader heartbeat at ~0p " , [LeaderHeartbeatTs ]),
3375+ wa_raft_info :set_live (Table , Partition , false ),
3376+ wa_raft_info :get_stale (Table , Partition ) =:= true andalso begin
3377+ ? SERVER_LOG_NOTICE (State , Data , " is now stale due to liveness" , []),
3378+ wa_raft_info :set_stale (Table , Partition , false )
3379+ end
3380+ end ,
3381+ ok .
3382+
3383+ % % Check the state of a follower or witness's last applied log entry versus the leader's
3384+ % % commit index to determine if the replica is lagging behind and adjust the partition's
3385+ % % staleness if needed.
3386+ -spec check_follower_lagging (State :: state (), LeaderCommit :: pos_integer (), State :: # raft_state {}) -> ok .
33503387check_follower_lagging (
3388+ State ,
33513389 LeaderCommit ,
33523390 # raft_state {
33533391 application = App ,
@@ -3358,24 +3396,30 @@ check_follower_lagging(
33583396) ->
33593397 Lagging = LeaderCommit - LastApplied ,
33603398 ? RAFT_GATHER ('raft.follower.lagging' , Lagging ),
3361- case Lagging < ? RAFT_FOLLOWER_STALE_ENTRIES (App ) of
3362- true ->
3363- wa_raft_info :get_stale (Table , Partition ) =/= false andalso begin
3364- ? SERVER_LOG_NOTICE (follower , Data , " catches up." , []),
3399+
3400+ % Witnesses are always considered stale and so do not re-check their staleness.
3401+ State =/= witness andalso begin
3402+ Stale = wa_raft_info :get_stale (Table , Partition ),
3403+ case Lagging >= ? RAFT_STALE_GRACE_PERIOD_ENTRIES (App ) of
3404+ Stale ->
3405+ ok ;
3406+ true ->
3407+ ? SERVER_LOG_NOTICE (State , Data , " last applied at ~0p is ~0p behind leader's commit at ~0p ." ,
3408+ [LastApplied , Lagging , LeaderCommit ]),
3409+ wa_raft_info :set_stale (Table , Partition , true );
3410+ false ->
3411+ ? SERVER_LOG_NOTICE (State , Data , " catches up." , []),
33653412 wa_raft_info :set_stale (Table , Partition , false )
3366- end ;
3367- false ->
3368- wa_raft_info :get_stale (Table , Partition ) =/= true andalso begin
3369- ? SERVER_LOG_NOTICE (follower , Data , " is far behind ~p (leader ~p , follower ~p )" ,
3370- [Lagging , LeaderCommit , LastApplied ]),
3371- wa_raft_info :set_stale (Table , Partition , true )
3372- end
3413+ end
33733414 end ,
3415+
33743416 ok .
33753417
3376- % % Check leader state and set stale if needed
3377- -spec check_leader_lagging (# raft_state {}) -> term ().
3378- check_leader_lagging (
3418+ % % As leader, compute the quorum of the most recent timestamps of follower's
3419+ % % acknowledgement of heartbeats and update the partition's staleness and
3420+ % % liveness when necessary.
3421+ -spec check_leader_liveness (# raft_state {}) -> term ().
3422+ check_leader_liveness (
33793423 # raft_state {
33803424 application = App ,
33813425 table = Table ,
@@ -3395,9 +3439,11 @@ check_leader_lagging(
33953439 ok ;
33963440 true ->
33973441 ? SERVER_LOG_NOTICE (leader , State , " is now stale due to last heartbeat quorum age being ~0p ms >= ~0p ms max" , [QuorumAge , MaxAge ]),
3442+ wa_raft_info :set_live (Table , Partition , true ),
33983443 wa_raft_info :set_stale (Table , Partition , true );
33993444 false ->
34003445 ? SERVER_LOG_NOTICE (leader , State , " is no longer stale after heartbeat quorum age drops to ~0p ms < ~0p ms max" , [QuorumAge , MaxAge ]),
3446+ wa_raft_info :set_live (Table , Partition , false ),
34013447 wa_raft_info :set_stale (Table , Partition , false )
34023448 end .
34033449
0 commit comments