Skip to content

Commit a66c716

Browse files
authored
Speed up fanout exchange (#14546)
* Add test case for binding args Khepri regression This commit adds a test case for a regression/bug that occurs in Khepri. ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=mnesia ``` succeeds, but ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=khepri ``` fails. The problem is that ETS table `rabbit_khepri_index_route` cannot differentiate between two bindings with different binding arguments, and therefore deletes entries too early, leading to wrong routing decisions. The solution to this bug is to include the binding arguments in the `rabbit_khepri_index_route` projection, similar to how the binding args are also included in the `rabbit_index_route` Mnesia table. This bug/regression is an edge case and exists if the source exchange type is `direct` or `fanout` and if different bindings arguments are used by client apps. Note that such binding arguments are entirely ignored when RabbitMQ performs routing decisions for the `direct` or `fanout` exchange. However, there might be client apps that use binding arguments to add some metadata to the binding, for example `app-id` or `user` or `purpose` and might use this metadata as a form of reference counting in deciding when to delete `auto-delete` exchanges or just for informational/operational purposes. * Fix regression with Khepri binding args Fix #14533 * Speed up fanout exchange Resolves #14531 ## What? Increase end-to-end message throughput for messages routed via the fanout exchange by ~42% (see benchmark below). In addition to the fanout exchange, a similar speed up is achieved for the following exchange types: * modulus hash * random * recent history This applies only if Khepri is enabled. ## How? Use an additional routing table (projection) whose table key is the source exchange. Looking up the destinations happens then by an ETS table key. Prior to this commit, CPUs were busy compiling the same match spec for every incoming message. ## Benchmark 1. Start RabbitMQ: ``` make run-broker RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+S 5" \ RABBITMQ_CONFIG_FILE="advanced.config" PLUGINS="rabbitmq_management" ``` where `advanced.config` contains: ``` [ {rabbitmq_management_agent, [ {disable_metrics_collector, true} ]} ]. ``` 2. Create a queue and binding: ``` deps/rabbitmq_management/bin/rabbitmqadmin declare queue queue_type=classic durable=true name=q1 && \ deps/rabbitmq_management/bin/rabbitmqadmin declare binding source=amq.fanout destination=q1 ``` 3. Create the load ``` java -jar target/perf-test.jar -p -e amq.fanout -u q1 -s 5 --autoack -z 60 ``` Before this commit: ``` sending rate avg: 97394 msg/s receiving rate avg: 97394 msg/s ``` After this commit: ``` sending rate avg: 138677 msg/s receiving rate avg: 138677 msg/s ``` The CPU flamegraph shows that `rabbit_exchange:route/3` consumes the following CPU amounts: * 13.5% before this commit * 3.4% after this commit ## Downsides Additional ETS memory usage for the new projection table. However, the new table does not store any binding entries for the following source exchange types: * direct * headers * topic * x-local-random * Add exchange binding tests Test that exchange bindings work correctly with the new projection tables `rabbit_khepri_route_by_source` and `rabbit_khepri_route_by_source_key`. * Always register all projections Khepri won’t modify a projection that is already registered (based on its name). * Protect ets:lookup_element/4 in try catch See #11667 (comment) for rationale.
1 parent 18e4bf6 commit a66c716

File tree

5 files changed

+195
-47
lines changed

5 files changed

+195
-47
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,5 +216,6 @@
216216
{'rabbitmq_4.2.0',
217217
#{desc => "Allows rolling upgrades to 4.2.x",
218218
stability => stable,
219-
depends_on => ['rabbitmq_4.1.0']
219+
depends_on => ['rabbitmq_4.1.0'],
220+
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
220221
}}).

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@
5656
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
5757
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
5858
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
59-
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).
59+
-define(KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION, rabbit_khepri_route_by_source_key).
60+
-define(KHEPRI_ROUTE_BY_SOURCE_PROJECTION, rabbit_khepri_route_by_source).
6061

6162
%% -------------------------------------------------------------------
6263
%% exists().
@@ -708,10 +709,10 @@ match_routing_key_in_mnesia(SrcName, RoutingKeys, UseIndex) ->
708709

709710
match_routing_key_in_khepri(Src, ['_']) ->
710711
try
711-
MatchHead = #index_route{source_key = {Src, '_'},
712-
destination = '$1',
713-
_ = '_'},
714-
ets:select(?KHEPRI_INDEX_ROUTE_PROJECTION, [{MatchHead, [], ['$1']}])
712+
ets:lookup_element(?KHEPRI_ROUTE_BY_SOURCE_PROJECTION,
713+
Src,
714+
#route_by_source.destination,
715+
[])
715716
catch
716717
error:badarg ->
717718
[]
@@ -721,7 +722,7 @@ match_routing_key_in_khepri(Src, RoutingKeys) ->
721722
fun(RK, Acc) ->
722723
try
723724
Dst = ets:lookup_element(
724-
?KHEPRI_INDEX_ROUTE_PROJECTION,
725+
?KHEPRI_ROUTE_BY_SOURCE_KEY_PROJECTION,
725726
{Src, RK},
726727
#index_route.destination),
727728
Dst ++ Acc

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 86 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
%% executed. If the migration runs concurrently, whether it started before or
6161
%% during the execution of the Mnesia-specific anonymous function, {@link
6262
%% handle_fallback/1} will watch for "no exists" table exceptions from Mnesia
63-
%% and will retry the Mnesia functino or run the Khepri function accordingly.
63+
%% and will retry the Mnesia function or run the Khepri function accordingly.
6464
%% The Mnesia function must be idempotent because it can be executed multiple
6565
%% times.
6666
%%
@@ -170,6 +170,7 @@
170170
%% equivalent cluster
171171
-export([khepri_db_migration_enable/1,
172172
khepri_db_migration_post_enable/1,
173+
enable_feature_flag/1,
173174
is_enabled/0, is_enabled/1,
174175
get_feature_state/0, get_feature_state/1,
175176
handle_fallback/1]).
@@ -331,10 +332,7 @@ init(IsVirgin) ->
331332
"local Khepri-based " ?RA_FRIENDLY_NAME " member is caught "
332333
"up to the Raft cluster leader", [],
333334
#{domain => ?RMQLOG_DOMAIN_DB}),
334-
ok ?= case IsVirgin of
335-
true -> register_projections();
336-
false -> ok
337-
end,
335+
ok ?= register_projections(),
338336
%% Delete transient queues on init.
339337
%% Note that we also do this in the
340338
%% `rabbit_amqqueue:on_node_down/1' callback. We must try this
@@ -1318,7 +1316,8 @@ register_projections() ->
13181316
fun register_rabbit_per_vhost_runtime_parameters_projection/0,
13191317
fun register_rabbit_user_permissions_projection/0,
13201318
fun register_rabbit_bindings_projection/0,
1321-
fun register_rabbit_index_route_projection/0,
1319+
fun register_rabbit_route_by_source_key_projection/0,
1320+
fun register_rabbit_route_by_source_projection/0,
13221321
fun register_rabbit_topic_graph_projection/0],
13231322
rabbit_misc:for_each_while_ok(
13241323
fun(RegisterFun) ->
@@ -1414,35 +1413,78 @@ register_rabbit_bindings_projection() ->
14141413
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
14151414
khepri:register_projection(?STORE_ID, PathPattern, Projection).
14161415

1417-
register_rabbit_index_route_projection() ->
1418-
MapFun = fun(Path, _) ->
1419-
{
1420-
VHost,
1421-
ExchangeName,
1422-
Kind,
1423-
DstName,
1424-
RoutingKey
1425-
} = rabbit_db_binding:khepri_route_path_to_args(Path),
1426-
Exchange = rabbit_misc:r(VHost, exchange, ExchangeName),
1427-
Destination = rabbit_misc:r(VHost, Kind, DstName),
1428-
SourceKey = {Exchange, RoutingKey},
1429-
#index_route{source_key = SourceKey,
1430-
destination = Destination}
1416+
register_rabbit_route_by_source_key_projection() ->
1417+
MapFun = fun(_Path, #binding{source = Source,
1418+
key = Key,
1419+
destination = Destination,
1420+
args = Args}) ->
1421+
#index_route{source_key = {Source, Key},
1422+
destination = Destination,
1423+
args = Args}
14311424
end,
14321425
ProjectionFun = projection_fun_for_sets(MapFun),
14331426
Options = #{type => bag,
14341427
keypos => #index_route.source_key,
14351428
read_concurrency => true},
1436-
Projection = khepri_projection:new(
1437-
rabbit_khepri_index_route, ProjectionFun, Options),
1438-
DirectOrFanout = #if_data_matches{
1439-
pattern = #exchange{type = '$1', _ = '_'},
1440-
conditions = [{'andalso',
1441-
{'=/=', '$1', headers},
1442-
{'=/=', '$1', topic}}]},
1429+
Projection = khepri_projection:new(rabbit_khepri_route_by_source_key,
1430+
ProjectionFun,
1431+
Options),
1432+
Exchange = #if_data_matches{
1433+
pattern = #exchange{type = '$1', _ = '_'},
1434+
conditions = [{'andalso',
1435+
{'=/=', '$1', headers},
1436+
{'=/=', '$1', topic},
1437+
{'=/=', '$1', fanout},
1438+
{'=/=', '$1', 'x-jms-topic'},
1439+
{'=/=', '$1', 'x-random'}
1440+
}]},
14431441
PathPattern = rabbit_db_binding:khepri_route_path(
14441442
_VHost = ?KHEPRI_WILDCARD_STAR,
1445-
_Exchange = DirectOrFanout,
1443+
Exchange,
1444+
_Kind = ?KHEPRI_WILDCARD_STAR,
1445+
_DstName = ?KHEPRI_WILDCARD_STAR,
1446+
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
1447+
khepri:register_projection(?STORE_ID, PathPattern, Projection).
1448+
1449+
register_rabbit_route_by_source_projection() ->
1450+
MapFun = fun(_Path, #binding{source = Source,
1451+
key = Key,
1452+
destination = Destination,
1453+
args = Args}) ->
1454+
#route_by_source{source = Source,
1455+
key = Key,
1456+
destination = Destination,
1457+
args = Args}
1458+
end,
1459+
ProjectionFun = projection_fun_for_sets(MapFun),
1460+
Options = #{type => bag,
1461+
keypos => #route_by_source.source,
1462+
read_concurrency => true},
1463+
Projection = khepri_projection:new(rabbit_khepri_route_by_source,
1464+
ProjectionFun,
1465+
Options),
1466+
%% For some exchange types we know that they won't use this projection.
1467+
%% So we exclude such bindings for two reasons:
1468+
%% 1. Lower overall ETS memory usage
1469+
%% 2. "Avoid inserting an extensive amount of objects with the same key.
1470+
%% It will hurt insert and lookup performance as well as real time characteristics
1471+
%% of the runtime environment (hash bucket linear search do not yield)."
1472+
%% Example: same source direct exchange with 100k different binding keys.
1473+
%% In future, rather than exchange types exclusion as done here, a nicer approach
1474+
%% would be that each exchange requiring routing lookup by only source exchange
1475+
%% advertises this access pattern, e.g. as a boolean flag in the #exchange.options field.
1476+
Exchange = #if_data_matches{
1477+
pattern = #exchange{type = '$1', _ = '_'},
1478+
conditions = [{'andalso',
1479+
{'=/=', '$1', headers},
1480+
{'=/=', '$1', topic},
1481+
{'=/=', '$1', direct},
1482+
{'=/=', '$1', 'x-local-random'},
1483+
{'=/=', '$1', 'x-jms-topic'}
1484+
}]},
1485+
PathPattern = rabbit_db_binding:khepri_route_path(
1486+
_VHost = ?KHEPRI_WILDCARD_STAR,
1487+
Exchange,
14461488
_Kind = ?KHEPRI_WILDCARD_STAR,
14471489
_DstName = ?KHEPRI_WILDCARD_STAR,
14481490
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
@@ -1761,6 +1803,22 @@ khepri_db_migration_post_enable(
17611803
_ = mnesia_to_khepri:rollback_table_copy(?STORE_ID, ?MIGRATION_ID),
17621804
ok.
17631805

1806+
enable_feature_flag(#{command := enable,
1807+
feature_name := 'rabbitmq_4.2.0' = FeatureName}) ->
1808+
%% We unregister this projection because it's superseded by
1809+
%% rabbit_khepri_route_by_source_key introduced in 4.2.0
1810+
ProjectionName = rabbit_khepri_index_route,
1811+
Result = try khepri:unregister_projections(?STORE_ID, [ProjectionName])
1812+
catch _:Reason -> Reason
1813+
end,
1814+
?LOG_DEBUG(
1815+
"enabling feature flag ~s unregisters projection ~s: ~tp",
1816+
[FeatureName, ProjectionName, Result],
1817+
#{domain => ?RMQLOG_DOMAIN_DB}),
1818+
ok;
1819+
enable_feature_flag(_) ->
1820+
ok.
1821+
17641822
-spec sync_cluster_membership_from_mnesia(FeatureName) -> Ret when
17651823
FeatureName :: rabbit_feature_flags:feature_name(),
17661824
Ret :: ok | {error, Reason},

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 99 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
1414

1515
-compile([nowarn_export_all, export_all]).
16-
-compile(export_all).
1716

1817
suite() ->
1918
[{timetrap, 5 * 60000}].
@@ -49,8 +48,12 @@ all_tests() ->
4948
list_with_multiple_vhosts,
5049
list_with_multiple_arguments,
5150
bind_to_unknown_queue,
51+
binding_args_direct_exchange,
52+
binding_args_fanout_exchange,
53+
5254
%% Exchange bindings
53-
bind_and_unbind_exchange,
55+
bind_and_unbind_direct_exchange,
56+
bind_and_unbind_fanout_exchange,
5457
bind_and_delete_exchange_source,
5558
bind_and_delete_exchange_destination,
5659
bind_to_unknown_exchange,
@@ -116,6 +119,7 @@ end_per_testcase(Testcase, Config) ->
116119
%% -------------------------------------------------------------------
117120
%% Testcases.
118121
%% -------------------------------------------------------------------
122+
119123
bind_and_unbind(Config) ->
120124
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
121125

@@ -697,33 +701,116 @@ bind_to_unknown_queue(Config) ->
697701
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
698702
ok.
699703

700-
bind_and_unbind_exchange(Config) ->
704+
%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/14533
705+
binding_args_direct_exchange(Config) ->
706+
binding_args(<<"amq.direct">>, Config).
707+
708+
binding_args_fanout_exchange(Config) ->
709+
binding_args(<<"amq.fanout">>, Config).
710+
711+
binding_args(Exchange, Config) ->
712+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
713+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
714+
Q = ?config(queue_name, Config),
715+
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
716+
717+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
718+
amqp_channel:register_confirm_handler(Ch, self()),
719+
720+
%% Create two bindings that differ only in their binding arguments.
721+
RoutingKey = <<"some-key">>,
722+
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
723+
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
724+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
725+
routing_key = RoutingKey,
726+
queue = Q,
727+
arguments = BindingArgs1}),
728+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
729+
routing_key = RoutingKey,
730+
queue = Q,
731+
arguments = BindingArgs2}),
732+
ok = amqp_channel:cast(Ch,
733+
#'basic.publish'{exchange = Exchange,
734+
routing_key = RoutingKey},
735+
#amqp_msg{payload = <<"m1">>}),
736+
receive #'basic.ack'{} -> ok
737+
after 9000 -> ct:fail(confirm_timeout)
738+
end,
739+
740+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
741+
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),
742+
743+
%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
744+
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
745+
routing_key = RoutingKey,
746+
queue = Q,
747+
arguments = BindingArgs1}),
748+
ok = amqp_channel:cast(Ch,
749+
#'basic.publish'{exchange = Exchange,
750+
routing_key = RoutingKey},
751+
#amqp_msg{payload = <<"m2">>}),
752+
receive #'basic.ack'{} -> ok
753+
after 9000 -> ct:fail(confirm_timeout)
754+
end,
755+
756+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
757+
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})).
758+
759+
bind_and_unbind_direct_exchange(Config) ->
760+
bind_and_unbind_exchange(<<"direct">>, Config).
761+
762+
bind_and_unbind_fanout_exchange(Config) ->
763+
bind_and_unbind_exchange(<<"fanout">>, Config).
764+
765+
bind_and_unbind_exchange(Type, Config) ->
701766
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
702767

703768
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
704769
X = ?config(exchange_name, Config),
770+
Q = ?config(queue_name, Config),
771+
RoutingKey = <<"some key">>,
772+
SourceExchange = <<"amq.", Type/binary>>,
705773

706774
?assertEqual([],
707775
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),
708776

709-
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X}),
777+
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X,
778+
type = Type}),
710779
%% Let's bind to other exchange
711780
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = X,
712-
source = <<"amq.direct">>,
713-
routing_key = <<"key">>}),
781+
source = SourceExchange,
782+
routing_key = RoutingKey}),
714783

715-
DirectBinding = binding_record(rabbit_misc:r(<<"/">>, exchange, <<"amq.direct">>),
716-
rabbit_misc:r(<<"/">>, exchange, X),
717-
<<"key">>, []),
784+
Binding = binding_record(rabbit_misc:r(<<"/">>, exchange, SourceExchange),
785+
rabbit_misc:r(<<"/">>, exchange, X),
786+
RoutingKey, []),
718787

719-
?assertEqual([DirectBinding],
788+
?assertEqual([Binding],
720789
lists:sort(
721790
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>]))),
722791

792+
%% Test that a message gets routed:
793+
%% exchange -> exchange -> queue
794+
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
795+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X,
796+
routing_key = RoutingKey,
797+
queue = Q}),
798+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
799+
amqp_channel:register_confirm_handler(Ch, self()),
800+
ok = amqp_channel:cast(Ch,
801+
#'basic.publish'{exchange = SourceExchange,
802+
routing_key = RoutingKey},
803+
#amqp_msg{payload = <<"m1">>}),
804+
receive #'basic.ack'{} -> ok
805+
after 9000 -> ct:fail(confirm_timeout)
806+
end,
807+
?assertEqual(#'queue.delete_ok'{message_count = 1},
808+
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
809+
723810
#'exchange.unbind_ok'{} = amqp_channel:call(Ch,
724811
#'exchange.unbind'{destination = X,
725-
source = <<"amq.direct">>,
726-
routing_key = <<"key">>}),
812+
source = SourceExchange,
813+
routing_key = RoutingKey}),
727814

728815
?assertEqual([],
729816
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list, [<<"/">>])),

deps/rabbit_common/include/rabbit.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
-record(route, {binding, value = const}).
9595
-record(reverse_route, {reverse_binding, value = const}).
9696
-record(index_route, {source_key, destination, args = []}).
97+
-record(route_by_source, {source, key, destination, args = []}).
9798

9899
-record(binding, {source, key, destination, args = []}).
99100
-record(reverse_binding, {destination, key, source, args = []}).

0 commit comments

Comments
 (0)