Skip to content

Commit 6b3c04e

Browse files
committed
Close stream connection in case of unexpected error from SAC coordinator
Calls to the stream SAC coordinator can fail for various reason (e.g. a timeout because of a network partition). The stream reader does not take into account what the SAC coordinator returns and moves on even in case of errors. This can lead to inconsistent state for SAC groups. This commit changes this behavior by handling unexpected errors from the SAC coordinator and closing the connection. The client is expected to reconnect. This is safer than risking inconsistent state. Fixes #14040
1 parent 52283d4 commit 6b3c04e

File tree

2 files changed

+70
-41
lines changed

2 files changed

+70
-41
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
-opaque state() :: #?MODULE{}.
2929

30+
-type sac_error() :: partition_index_conflict | not_found.
31+
3032
-export_type([state/0,
3133
command/0]).
3234

@@ -50,7 +52,8 @@
5052
import_state/2,
5153
check_conf_change/1,
5254
list_nodes/1,
53-
state_enter/2
55+
state_enter/2,
56+
is_sac_error/1
5457
]).
5558
-export([make_purge_nodes/1,
5659
make_update_conf/1]).
@@ -78,6 +81,7 @@
7881
-define(DISCONNECTED_TIMEOUT_APP_KEY, stream_sac_disconnected_timeout).
7982
-define(DISCONNECTED_TIMEOUT_CONF_KEY, disconnected_timeout).
8083
-define(DISCONNECTED_TIMEOUT_MS, 60_000).
84+
-define(SAC_ERRORS, [partition_index_conflict, not_found]).
8185

8286
%% Single Active Consumer API
8387
-spec register_consumer(binary(),
@@ -87,7 +91,7 @@
8791
pid(),
8892
binary(),
8993
integer()) ->
90-
{ok, boolean()} | {error, term()}.
94+
{ok, boolean()} | {error, sac_error() | term()}.
9195
register_consumer(VirtualHost,
9296
Stream,
9397
PartitionIndex,
@@ -108,7 +112,7 @@ register_consumer(VirtualHost,
108112
binary(),
109113
pid(),
110114
integer()) ->
111-
ok | {error, term()}.
115+
ok | {error, sac_error() | term()}.
112116
unregister_consumer(VirtualHost,
113117
Stream,
114118
ConsumerName,
@@ -120,13 +124,15 @@ unregister_consumer(VirtualHost,
120124
connection_pid = ConnectionPid,
121125
subscription_id = SubscriptionId}).
122126

123-
-spec activate_consumer(binary(), binary(), binary()) -> ok.
127+
-spec activate_consumer(binary(), binary(), binary()) ->
128+
ok | {error, sac_error() | term()}.
124129
activate_consumer(VH, Stream, Name) ->
125130
process_command(#command_activate_consumer{vhost =VH,
126131
stream = Stream,
127132
consumer_name= Name}).
128133

129-
-spec connection_reconnected(connection_pid()) -> ok.
134+
-spec connection_reconnected(connection_pid()) ->
135+
ok | {error, sac_error() | term()}.
130136
connection_reconnected(Pid) ->
131137
process_command(#command_connection_reconnected{pid = Pid}).
132138

@@ -147,7 +153,7 @@ wrap_cmd(Cmd) ->
147153
%% return the current groups for a given virtual host
148154
-spec consumer_groups(binary(), [atom()]) ->
149155
{ok,
150-
[term()] | {error, atom()}}.
156+
[term()]} | {error, sac_error() | term()}.
151157
consumer_groups(VirtualHost, InfoKeys) ->
152158
case ra_local_query(fun(State) ->
153159
SacState =
@@ -168,7 +174,7 @@ consumer_groups(VirtualHost, InfoKeys) ->
168174
%% get the consumers of a given group in a given virtual host
169175
-spec group_consumers(binary(), binary(), binary(), [atom()]) ->
170176
{ok, [term()]} |
171-
{error, atom()}.
177+
{error, sac_error() | term()}.
172178
group_consumers(VirtualHost, Stream, Reference, InfoKeys) ->
173179
case ra_local_query(fun(State) ->
174180
SacState =
@@ -926,6 +932,10 @@ state_enter(leader, #?MODULE{groups = Groups} = State)
926932
state_enter(_, _) ->
927933
[].
928934

935+
-spec is_sac_error(term()) -> boolean().
936+
is_sac_error(Reason) ->
937+
lists:member(Reason, ?SAC_ERRORS).
938+
929939
nodes_from_group(#group{consumers = Cs}) when is_list(Cs) ->
930940
lists:foldl(fun(#consumer{pid = Pid}, Acc) ->
931941
Acc#{node(Pid) => true}

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
-define(UNKNOWN_FIELD, unknown_field).
8282
-define(SILENT_CLOSE_DELAY, 3_000).
8383
-define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255).
84+
-define(SAC_MOD, rabbit_stream_sac_coordinator).
8485

8586
-import(rabbit_stream_utils, [check_write_permitted/2,
8687
check_read_permitted/3]).
@@ -722,7 +723,7 @@ open(info, {OK, S, Data},
722723
connection_state = State2}}
723724
end;
724725
open(info, {sac, check_connection, _}, State) ->
725-
rabbit_stream_sac_coordinator:connection_reconnected(self()),
726+
sac_connection_reconnected(self()),
726727
{keep_state, State};
727728
open(info,
728729
{sac, #{subscription_id := SubId,
@@ -800,9 +801,7 @@ open(info,
800801
consumer_name := ConsumerName}} ->
801802
rabbit_log:debug("Former active consumer gone, activating consumer " ++
802803
"on stream ~tp, group ~tp", [St, ConsumerName]),
803-
_ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
804-
St,
805-
ConsumerName);
804+
sac_activate_consumer(VirtualHost, St, ConsumerName);
806805
_ ->
807806
ok
808807
end,
@@ -2554,9 +2553,8 @@ handle_frame_post_auth(Transport,
25542553
rabbit_log:debug("Subscription ~tp on stream ~tp, group ~tp " ++
25552554
"has stepped down, activating consumer",
25562555
[SubscriptionId, Stream, ConsumerName]),
2557-
_ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
2558-
Stream,
2559-
ConsumerName),
2556+
sac_activate_consumer(VirtualHost, Stream,
2557+
ConsumerName),
25602558
ok;
25612559
_ ->
25622560
ok
@@ -3015,21 +3013,9 @@ handle_subscription(Transport,#stream_connection{
30153013

30163014
maybe_register_consumer(_, _, _, _, _, _, false = _Sac) ->
30173015
{ok, true};
3018-
maybe_register_consumer(VirtualHost,
3019-
Stream,
3020-
ConsumerName,
3021-
ConnectionName,
3022-
SubscriptionId,
3023-
Properties,
3024-
true) ->
3025-
PartitionIndex = partition_index(VirtualHost, Stream, Properties),
3026-
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
3027-
Stream,
3028-
PartitionIndex,
3029-
ConsumerName,
3030-
self(),
3031-
ConnectionName,
3032-
SubscriptionId).
3016+
maybe_register_consumer(VH, St, Name, ConnName, SubId, Properties, true) ->
3017+
PartitionIndex = partition_index(VH, St, Properties),
3018+
sac_register_consumer(VH, St, PartitionIndex, Name, self(), ConnName, SubId).
30333019

30343020
maybe_send_consumer_update(Transport,
30353021
Connection = #stream_connection{
@@ -3175,13 +3161,12 @@ maybe_unregister_consumer(VirtualHost,
31753161
ConsumerName = consumer_name(Properties),
31763162

31773163
Requests1 = maps:fold(
3178-
fun(_, #request{content =
3179-
#{active := false,
3180-
subscription_id := SubId,
3181-
stepping_down := true}}, Acc) when SubId =:= SubscriptionId ->
3182-
_ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost,
3183-
Stream,
3184-
ConsumerName),
3164+
fun(_, #request{content = #{active := false,
3165+
subscription_id := SubId,
3166+
stepping_down := true}}, Acc)
3167+
when SubId =:= SubscriptionId ->
3168+
sac_activate_consumer(VirtualHost, Stream,
3169+
ConsumerName),
31853170
rabbit_log:debug("Outstanding SAC activation request for stream '~tp', " ++
31863171
"group '~tp', sending activation.",
31873172
[Stream, ConsumerName]),
@@ -3190,11 +3175,8 @@ maybe_unregister_consumer(VirtualHost,
31903175
Acc#{K => V}
31913176
end, maps:new(), Requests),
31923177

3193-
_ = rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost,
3194-
Stream,
3195-
ConsumerName,
3196-
self(),
3197-
SubscriptionId),
3178+
sac_unregister_consumer(VirtualHost, Stream, ConsumerName,
3179+
self(), SubscriptionId),
31983180
Requests1.
31993181

32003182
partition_index(VirtualHost, Stream, Properties) ->
@@ -4037,3 +4019,40 @@ stream_from_consumers(SubId, Consumers) ->
40374019
%% for a bit so they can't DOS us with repeated failed logins etc.
40384020
silent_close_delay() ->
40394021
timer:sleep(?SILENT_CLOSE_DELAY).
4022+
4023+
sac_connection_reconnected(Pid) ->
4024+
sac_call(fun() ->
4025+
?SAC_MOD:connection_reconnected(Pid)
4026+
end).
4027+
4028+
sac_activate_consumer(VH, St, Name) ->
4029+
sac_call(fun() ->
4030+
?SAC_MOD:activate_consumer(VH, St, Name)
4031+
end).
4032+
4033+
sac_register_consumer(VH, St, PartitionIndex, Name, Pid, ConnName, SubId) ->
4034+
sac_call(fun() ->
4035+
?SAC_MOD:register_consumer(VH, St, PartitionIndex,
4036+
Name, Pid, ConnName,
4037+
SubId)
4038+
end).
4039+
4040+
sac_unregister_consumer(VH, St, Name, Pid, SubId) ->
4041+
sac_call(fun() ->
4042+
?SAC_MOD:unregister_consumer(VH, St, Name, Pid, SubId)
4043+
end).
4044+
4045+
sac_call(Call) ->
4046+
case Call() of
4047+
{error, Reason} = Err ->
4048+
case ?SAC_MOD:is_sac_error(Reason) of
4049+
true ->
4050+
Err;
4051+
_ ->
4052+
rabbit_log:info("Stream SAC coordinator call failed with ~tp",
4053+
[Reason]),
4054+
throw({stop, {shutdown, stream_sac_coordinator_error}})
4055+
end;
4056+
R ->
4057+
R
4058+
end.

0 commit comments

Comments
 (0)