Skip to content

Commit a7827ba

Browse files
authored
Merge pull request #14189 from rabbitmq/stream-dispatching-improvements
Integrate Osiris reader optimizations for streams
2 parents 25db081 + 025280e commit a7827ba

File tree

7 files changed

+65
-32
lines changed

7 files changed

+65
-32
lines changed

deps/rabbit/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ define PROJECT_ENV
119119
{dead_letter_worker_consumer_prefetch, 32},
120120
{dead_letter_worker_publisher_confirm_timeout, 180000},
121121
{vhost_process_reconciliation_run_interval, 30},
122+
{stream_read_ahead, true},
122123
%% for testing
123124
{vhost_process_reconciliation_enabled, true},
124125
{license_line, "Licensed under the MPL 2.0. Website: https://rabbitmq.com"}

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2785,6 +2785,9 @@ fun(Conf) ->
27852785
end
27862786
end}.
27872787

2788+
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
2789+
[{datatype, {enum, [true, false]}}]}.
2790+
27882791
{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
27892792
{datatype, [binary]}
27902793
]}.

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
-export([format_osiris_event/2]).
5252
-export([update_stream_conf/2]).
5353
-export([readers/1]).
54+
-export([read_ahead_on/0]).
5455

5556
-export([parse_offset_arg/1,
5657
filter_spec/1]).
@@ -463,10 +464,11 @@ query_local_pid(#stream_client{stream_id = StreamId} = State) ->
463464
begin_stream(#stream_client{name = QName,
464465
readers = Readers0,
465466
local_pid = LocalPid} = State,
466-
Tag, Offset, Mode, AckRequired, Filter, Options)
467+
Tag, Offset, Mode, AckRequired, Filter, Options0)
467468
when is_pid(LocalPid) ->
468469
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
469-
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
470+
Options1 = Options0#{read_ahead => read_ahead_on()},
471+
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1),
470472
NextOffset = osiris_log:next_offset(Seg0) - 1,
471473
osiris:register_offset_listener(LocalPid, NextOffset),
472474
StartOffset = case Offset of
@@ -491,7 +493,7 @@ begin_stream(#stream_client{name = QName,
491493
last_consumed_offset = StartOffset,
492494
log = Seg0,
493495
filter = Filter,
494-
reader_options = Options},
496+
reader_options = Options1},
495497
{ok, State#stream_client{readers = Readers0#{Tag => Str0}}}.
496498

497499
cancel(_Q, #{consumer_tag := ConsumerTag,
@@ -659,8 +661,8 @@ handle_event(_QName, {stream_local_member_change, Pid},
659661
osiris_log:close(Log0),
660662
CounterSpec = {{?MODULE, QName, self()}, []},
661663
?LOG_DEBUG("Re-creating Osiris reader for consumer ~tp at offset ~tp "
662-
" with options ~tp",
663-
[T, Offset, Options]),
664+
" with options ~tp",
665+
[T, Offset, Options]),
664666
{ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec, Options),
665667
NextOffset = osiris_log:next_offset(Log1) - 1,
666668
?LOG_DEBUG("Registering offset listener at offset ~tp", [NextOffset]),
@@ -1176,7 +1178,7 @@ stream_entries(QName, Name, CTag, LocalPid,
11761178
credit = Credit} = Str0) ->
11771179
case Credit > 0 of
11781180
true ->
1179-
case chunk_iterator(Str0, LocalPid) of
1181+
case chunk_iterator(Str0, LocalPid, undefined) of
11801182
{ok, Str} ->
11811183
stream_entries(QName, Name, CTag, LocalPid, Str);
11821184
{end_of_stream, Str} ->
@@ -1229,7 +1231,7 @@ stream_entries(QName, Name, CTag, LocalPid,
12291231
gen_server:cast(self(), queue_event(QName, {resume_filtering, CTag})),
12301232
{Str0#stream{filtering_paused = true}, lists:reverse(Acc0)};
12311233
end_of_chunk ->
1232-
case chunk_iterator(Str0, LocalPid) of
1234+
case chunk_iterator(Str0, LocalPid, Iter0) of
12331235
{ok, Str} ->
12341236
stream_entries(QName, Name, CTag, LocalPid, Str, Acc0);
12351237
{end_of_stream, Str} ->
@@ -1294,8 +1296,8 @@ stream_entries(QName, Name, CTag, LocalPid,
12941296

12951297
chunk_iterator(#stream{credit = Credit,
12961298
listening_offset = LOffs,
1297-
log = Log0} = Str0, LocalPid) ->
1298-
case osiris_log:chunk_iterator(Log0, Credit) of
1299+
log = Log0} = Str0, LocalPid, PrevIterator) ->
1300+
case osiris_log:chunk_iterator(Log0, Credit, PrevIterator) of
12991301
{ok, _ChunkHeader, Iter, Log} ->
13001302
{ok, Str0#stream{chunk_iterator = Iter,
13011303
log = Log}};
@@ -1527,3 +1529,6 @@ queue_vm_stats_sups() ->
15271529
queue_vm_ets() ->
15281530
{[],
15291531
[]}.
1532+
1533+
read_ahead_on() ->
1534+
application:get_env(rabbit, stream_read_ahead, true).

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,28 @@ credential_validator.regexp = ^abc\\d+",
12231223
[{osiris, [
12241224
{port_range, {4100, 4600}}
12251225
]}],
1226+
[]},
1227+
1228+
%%
1229+
%% Stream read ahead on/off
1230+
%%
1231+
1232+
{stream_read_ahead,
1233+
"
1234+
stream.read_ahead = true
1235+
",
1236+
[{rabbit, [
1237+
{stream_read_ahead, true}
1238+
]}],
1239+
[]},
1240+
1241+
{stream_read_ahead,
1242+
"
1243+
stream.read_ahead = false
1244+
",
1245+
[{rabbit, [
1246+
{stream_read_ahead, false}
1247+
]}],
12261248
[]}
12271249

12281250
].

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2812,11 +2812,14 @@ init_reader(ConnectionTransport,
28122812
Properties,
28132813
OffsetSpec) ->
28142814
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
2815-
Options = maps:merge(#{transport => ConnectionTransport,
2816-
chunk_selector => get_chunk_selector(Properties)},
2817-
rabbit_stream_utils:filter_spec(Properties)),
2818-
{ok, Segment} =
2819-
osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options),
2815+
Options0 = #{transport => ConnectionTransport,
2816+
chunk_selector => get_chunk_selector(Properties),
2817+
read_ahead => rabbit_stream_queue:read_ahead_on()},
2818+
2819+
Options1 = maps:merge(Options0,
2820+
rabbit_stream_utils:filter_spec(Properties)),
2821+
{ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec,
2822+
CounterSpec, Options1),
28202823
?LOG_DEBUG("Next offset for subscription ~tp is ~tp",
28212824
[SubscriptionId, osiris_log:next_offset(Segment)]),
28222825
Segment.
@@ -3571,12 +3574,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
35713574
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
35723575

35733576
send_file_callback(?VERSION_1,
3574-
Transport,
35753577
_Log,
35763578
#consumer{configuration =
3577-
#consumer_configuration{socket = S,
3578-
subscription_id =
3579-
SubscriptionId,
3579+
#consumer_configuration{subscription_id = SubId,
35803580
counters = Counters}},
35813581
Counter) ->
35823582
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3587,19 +3587,16 @@ send_file_callback(?VERSION_1,
35873587
?REQUEST:1,
35883588
?COMMAND_DELIVER:15,
35893589
?VERSION_1:16,
3590-
SubscriptionId:8/unsigned>>,
3591-
Transport:send(S, FrameBeginning),
3590+
SubId:8/unsigned>>,
35923591
atomics:add(Counter, 1, Size),
35933592
increase_messages_consumed(Counters, NumEntries),
3594-
set_consumer_offset(Counters, FirstOffsetInChunk)
3593+
set_consumer_offset(Counters, FirstOffsetInChunk),
3594+
FrameBeginning
35953595
end;
35963596
send_file_callback(?VERSION_2,
3597-
Transport,
35983597
Log,
35993598
#consumer{configuration =
3600-
#consumer_configuration{socket = S,
3601-
subscription_id =
3602-
SubscriptionId,
3599+
#consumer_configuration{subscription_id = SubId,
36033600
counters = Counters}},
36043601
Counter) ->
36053602
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3611,12 +3608,12 @@ send_file_callback(?VERSION_2,
36113608
?REQUEST:1,
36123609
?COMMAND_DELIVER:15,
36133610
?VERSION_2:16,
3614-
SubscriptionId:8/unsigned,
3611+
SubId:8/unsigned,
36153612
CommittedChunkId:64>>,
3616-
Transport:send(S, FrameBeginning),
36173613
atomics:add(Counter, 1, Size),
36183614
increase_messages_consumed(Counters, NumEntries),
3619-
set_consumer_offset(Counters, FirstOffsetInChunk)
3615+
set_consumer_offset(Counters, FirstOffsetInChunk),
3616+
FrameBeginning
36203617
end.
36213618

36223619
send_chunks(DeliverVersion,
@@ -3686,9 +3683,7 @@ send_chunks(DeliverVersion,
36863683
Retry,
36873684
Counter) ->
36883685
case osiris_log:send_file(Socket, Log,
3689-
send_file_callback(DeliverVersion,
3690-
Transport,
3691-
Log,
3686+
send_file_callback(DeliverVersion, Log,
36923687
Consumer,
36933688
Counter))
36943689
of

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
-include_lib("eunit/include/eunit.hrl").
2121
-include_lib("rabbit_common/include/rabbit.hrl").
22+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
2223
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
2324
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
2425

@@ -1773,3 +1774,9 @@ request(CorrId, Cmd) ->
17731774

17741775
rand_bin() ->
17751776
base64:encode(rand:bytes(20)).
1777+
1778+
generate_log(MsgSize, MsgsPerChunk, NumMessages, Directory) ->
1779+
Body = binary:copy(<<"a">>, MsgSize),
1780+
Data = #'v1_0.data'{content = Body},
1781+
Bin = amqp10_framing:encode_bin(Data),
1782+
osiris_log:generate_log(Bin, MsgsPerChunk, NumMessages, Directory).

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
4949
dep_khepri = hex 0.17.2
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
52-
dep_osiris = git https://github.com/rabbitmq/osiris v1.9.0
52+
dep_osiris = git https://github.com/rabbitmq/osiris v1.10.0
5353
dep_prometheus = hex 5.1.1
5454
dep_ra = hex 2.17.1
5555
dep_ranch = hex 2.2.0

0 commit comments

Comments
 (0)