Skip to content

Commit 9158c77

Browse files
patelarjavmeta-codesync[bot]
authored andcommitted
Add priority to commit API
Summary: The follow stack of diffs will implement priority requests (as per [this](https://docs.google.com/document/d/181je6C5Iwmi5Ckl87UPLOug2V0eLVp8jIM2DoDlWbpo/edit?tab=t.0#heading=h.9l8mmncnncc8) doc). --- This diff modifies the commit API in `wa_raft_acceptor` to take in a priority. It is a no-op, the priority isn't used anywhere. By default, we set priorities to be `high`. Reviewed By: jaher Differential Revision: D82851320 fbshipit-source-id: fb5c9ea12dd8b1cc906855a3a720ee40a54ec756
1 parent defc06c commit 9158c77

File tree

1 file changed

+23
-7
lines changed

1 file changed

+23
-7
lines changed

src/wa_raft_acceptor.erl

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
-export([
2020
commit/2,
2121
commit/3,
22+
commit/4,
2223
commit_async/3,
24+
commit_async/4,
2325
read/2,
2426
read/3
2527
]).
@@ -42,7 +44,8 @@
4244
command/0,
4345
key/0,
4446
op/0,
45-
read_op/0
47+
read_op/0,
48+
priority/0
4649
]).
4750

4851
-export_type([
@@ -68,6 +71,7 @@
6871
-type key() :: term().
6972
-type op() :: {Key :: key(), Command :: command()}.
7073
-type read_op() :: {From :: gen_server:from(), Command :: command()}.
74+
-type priority() :: high | low.
7175

7276
-type call_error_type() :: timeout | unreachable | {call_error, Reason :: term()}.
7377
-type call_error() :: {error, call_error_type()}.
@@ -78,8 +82,8 @@
7882
-type read_error() :: {error, read_error_type()}.
7983
-type read_result() :: Result :: dynamic() | Error :: read_error() | call_error().
8084

81-
-type commit_request() :: {commit, Op :: op()}.
82-
-type commit_async_request() :: {commit, From :: gen_server:from(), Op :: op()}.
85+
-type commit_request() :: {commit, Op :: op()} | {commit, Op :: op(), Priority :: priority()}.
86+
-type commit_async_request() :: {commit, From :: gen_server:from(), Op :: op()} | {commit, From :: gen_server:from(), Op :: op(), Priority :: priority()}.
8387
-type commit_error_type() ::
8488
not_leader |
8589
{commit_queue_full, Key :: key()} |
@@ -139,10 +143,18 @@ commit(ServerRef, Op) ->
139143
commit(ServerRef, Op, Timeout) ->
140144
call(ServerRef, {commit, Op}, Timeout).
141145

146+
-spec commit(ServerRef :: gen_server:server_ref(), Op :: op(), Timeout :: timeout(), Priority :: priority()) -> commit_result().
147+
commit(ServerRef, Op, Timeout, Priority) ->
148+
call(ServerRef, {commit, Op, Priority}, Timeout).
149+
142150
-spec commit_async(ServerRef :: gen_server:server_ref(), From :: {pid(), term()}, Op :: op()) -> ok.
143151
commit_async(ServerRef, From, Op) ->
144152
gen_server:cast(ServerRef, {commit, From, Op}).
145153

154+
-spec commit_async(ServerRef :: gen_server:server_ref(), From :: {pid(), term()}, Op :: op(), Priority :: priority()) -> ok.
155+
commit_async(ServerRef, From, Op, Priority) ->
156+
gen_server:cast(ServerRef, {commit, From, Op, Priority}).
157+
146158
% Strong-read
147159
-spec read(ServerRef :: gen_server:server_ref(), Command :: command()) -> read_result().
148160
read(ServerRef, Command) ->
@@ -207,7 +219,9 @@ handle_call({read, Command}, From, State) ->
207219
{error, _} = Error -> {reply, Error, State}
208220
end;
209221
handle_call({commit, Op}, From, State) ->
210-
case commit_impl(From, Op, State) of
222+
?MODULE:handle_call({commit, Op, high}, From, State);
223+
handle_call({commit, Op, Priority}, From, State) ->
224+
case commit_impl(From, Op, Priority, State) of
211225
continue -> {noreply, State};
212226
{error, _} = Error -> {reply, Error, State}
213227
end;
@@ -217,7 +231,9 @@ handle_call(Request, From, #state{name = Name} = State) ->
217231

218232
-spec handle_cast(commit_async_request(), #state{}) -> {noreply, #state{}}.
219233
handle_cast({commit, From, Op}, State) ->
220-
Result = commit_impl(From, Op, State),
234+
?MODULE:handle_cast({commit, From, Op, high}, State);
235+
handle_cast({commit, From, Op, Priority}, State) ->
236+
Result = commit_impl(From, Op, Priority, State),
221237
Result =/= continue andalso gen_server:reply(From, Result),
222238
{noreply, State};
223239
handle_cast(Request, #state{name = Name} = State) ->
@@ -234,8 +250,8 @@ terminate(Reason, #state{name = Name}) ->
234250
%%-------------------------------------------------------------------
235251

236252
%% Enqueue a commit.
237-
-spec commit_impl(From :: gen_server:from(), Request :: op(), State :: #state{}) -> continue | commit_error().
238-
commit_impl(From, {Key, _} = Op, #state{name = Name, server = Server, queues = Queues}) ->
253+
-spec commit_impl(From :: gen_server:from(), Request :: op(), Priority :: priority(), State :: #state{}) -> continue | commit_error().
254+
commit_impl(From, {Key, _} = Op, _Priority, #state{name = Name, server = Server, queues = Queues}) ->
239255
StartT = os:timestamp(),
240256
try
241257
?RAFT_LOG_DEBUG("Acceptor[~0p] starts to handle commit of ~0P from ~0p.", [Name, Op, 30, From]),

0 commit comments

Comments
 (0)