1313-export ([
1414 queues /1 ,
1515 queues /2 ,
16- commit_queue_size /1 ,
1716 commit_queue_size /2 ,
18- commit_queue_full / 1 ,
17+ commit_queue_size / 3 ,
1918 commit_queue_full /2 ,
19+ commit_queue_full /3 ,
2020 apply_queue_size /1 ,
2121 apply_queue_size /2 ,
2222 apply_queue_byte_size /1 ,
3535
3636% % PENDING COMMIT QUEUE API
3737-export ([
38- commit_started /1 ,
38+ commit_started /2 ,
3939 commit_cancelled /3 ,
4040 commit_completed /3
4141]).
8585-define (RAFT_QUEUE_TABLE_OPTIONS , [named_table , public , {read_concurrency , true }, {write_concurrency , true }]).
8686
8787% % Total number of counters for RAFT partition specfic counters
88- -define (RAFT_NUMBER_OF_QUEUE_SIZE_COUNTERS , 4 ).
88+ -define (RAFT_NUMBER_OF_QUEUE_SIZE_COUNTERS , 5 ).
8989% % Index into counter reference for counter tracking apply queue size
9090-define (RAFT_APPLY_QUEUE_SIZE_COUNTER , 1 ).
9191% % Index into counter reference for counter tracking apply total byte size
9292-define (RAFT_APPLY_QUEUE_BYTE_SIZE_COUNTER , 2 ).
93- % % Index into counter reference for counter tracking commit queue size
94- -define (RAFT_COMMIT_QUEUE_SIZE_COUNTER , 3 ).
93+ % % Index into counter reference for counter tracking high priority commit queue size
94+ -define (RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER , 3 ).
95+ % % Index into counter reference for counter tracking low priority commit queue size
96+ -define (RAFT_LOW_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER , 4 ).
9597% % Index into counter reference for counter tracking read queue size
96- -define (RAFT_READ_QUEUE_SIZE_COUNTER , 4 ).
98+ -define (RAFT_READ_QUEUE_SIZE_COUNTER , 5 ).
9799
98100% %-------------------------------------------------------------------
99101% % INTERNAL TYPES
@@ -129,33 +131,37 @@ queues(Table, Partition) ->
129131 Options -> queues (Options )
130132 end .
131133
132- -spec commit_queue_size (Queues :: queues ()) -> non_neg_integer ().
133- commit_queue_size (# queues {counters = Counters }) ->
134- atomics :get (Counters , ? RAFT_COMMIT_QUEUE_SIZE_COUNTER ).
134+ -spec commit_queue_size (Queues :: queues (), Priority :: wa_raft_acceptor :priority ()) -> non_neg_integer ().
135+ commit_queue_size (# queues {counters = Counters }, high ) ->
136+ atomics :get (Counters , ? RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER );
137+ commit_queue_size (# queues {counters = Counters }, low ) ->
138+ atomics :get (Counters , ? RAFT_LOW_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER ).
135139
136- -spec commit_queue_size (wa_raft :table (), wa_raft :partition ()) -> non_neg_integer ().
137- commit_queue_size (Table , Partition ) ->
140+ -spec commit_queue_size (Table :: wa_raft :table (), Partition :: wa_raft :partition (), Priority :: wa_raft_acceptor : priority ()) -> non_neg_integer ().
141+ commit_queue_size (Table , Partition , Priority ) ->
138142 case queues (Table , Partition ) of
139143 undefined -> 0 ;
140- Queue -> commit_queue_size (Queue )
144+ Queue -> commit_queue_size (Queue , Priority )
141145 end .
142146
143- -spec commit_queue_full (Queues :: queues ()) -> boolean ().
144- commit_queue_full (# queues {application = App , counters = Counters }) ->
145- atomics :get (Counters , ? RAFT_COMMIT_QUEUE_SIZE_COUNTER ) >= ? RAFT_MAX_PENDING_COMMITS (App ).
147+ -spec commit_queue_full (Queues :: queues (), Priority :: wa_raft_acceptor :priority ()) -> boolean ().
148+ commit_queue_full (# queues {application = App , counters = Counters }, high ) ->
149+ atomics :get (Counters , ? RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER ) >= ? RAFT_MAX_PENDING_HIGH_PRIORITY_COMMITS (App );
150+ commit_queue_full (# queues {application = App , counters = Counters }, low ) ->
151+ atomics :get (Counters , ? RAFT_LOW_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER ) >= ? RAFT_MAX_PENDING_LOW_PRIORITY_COMMITS (App ).
146152
147- -spec commit_queue_full (wa_raft :table (), wa_raft :partition ()) -> boolean ().
148- commit_queue_full (Table , Partition ) ->
153+ -spec commit_queue_full (Table :: wa_raft :table (), Partition :: wa_raft :partition (), Priority :: wa_raft_acceptor : priority ()) -> boolean ().
154+ commit_queue_full (Table , Partition , Priority ) ->
149155 case queues (Table , Partition ) of
150156 undefined -> false ;
151- Queues -> commit_queue_full (Queues )
157+ Queues -> commit_queue_full (Queues , Priority )
152158 end .
153159
154160-spec apply_queue_size (Queues :: queues ()) -> non_neg_integer ().
155161apply_queue_size (# queues {counters = Counters }) ->
156162 atomics :get (Counters , ? RAFT_APPLY_QUEUE_SIZE_COUNTER ).
157163
158- -spec apply_queue_size (wa_raft :table (), wa_raft :partition ()) -> non_neg_integer ().
164+ -spec apply_queue_size (Table :: wa_raft :table (), Partition :: wa_raft :partition ()) -> non_neg_integer ().
159165apply_queue_size (Table , Partition ) ->
160166 case queues (Table , Partition ) of
161167 undefined -> 0 ;
@@ -219,17 +225,17 @@ registered_name(Table, Partition) ->
219225% % PENDING COMMIT QUEUE API
220226% %-------------------------------------------------------------------
221227
222- -spec commit_started (Queues :: queues ()) -> ok | apply_queue_full | commit_queue_full .
223- commit_started (# queues {counters = Counters } = Queues ) ->
224- case commit_queue_full (Queues ) of
228+ -spec commit_started (Queues :: queues (), Priority :: wa_raft_acceptor : priority () ) -> ok | apply_queue_full | commit_queue_full .
229+ commit_started (# queues {counters = Counters } = Queues , Priority ) ->
230+ case commit_queue_full (Queues , Priority ) of
225231 true ->
226232 commit_queue_full ;
227233 false ->
228234 case apply_queue_full (Queues ) of
229235 true ->
230236 apply_queue_full ;
231237 false ->
232- PendingCommits = atomics :add_get (Counters , ? RAFT_COMMIT_QUEUE_SIZE_COUNTER , 1 ),
238+ PendingCommits = atomics :add_get (Counters , ? RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER , 1 ),
233239 ? RAFT_GATHER ('raft.acceptor.commit.request.pending' , PendingCommits ),
234240 ok
235241 end
@@ -238,13 +244,13 @@ commit_started(#queues{counters = Counters} = Queues) ->
238244
239245-spec commit_cancelled (Queues :: queues (), From :: gen_server :from (), Reason :: wa_raft_acceptor :commit_error () | undefined ) -> ok .
240246commit_cancelled (# queues {counters = Counters }, From , Reason ) ->
241- atomics :sub (Counters , ? RAFT_COMMIT_QUEUE_SIZE_COUNTER , 1 ),
247+ atomics :sub (Counters , ? RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER , 1 ),
242248 Reason =/= undefined andalso gen_server :reply (From , Reason ),
243249 ok .
244250
245251-spec commit_completed (Queues :: queues (), From :: gen_server :from (), Reply :: term ()) -> ok .
246252commit_completed (# queues {counters = Counters }, From , Reply ) ->
247- atomics :sub (Counters , ? RAFT_COMMIT_QUEUE_SIZE_COUNTER , 1 ),
253+ atomics :sub (Counters , ? RAFT_HIGH_PRIORITY_COMMIT_QUEUE_SIZE_COUNTER , 1 ),
248254 gen_server :reply (From , Reply ),
249255 ok .
250256
0 commit comments