Skip to content

Commit b68415a

Browse files
committed
Adapt stream code to Osiris read ahead
Osiris can read ahead data in case of small chunks. This saves system calls and increases consumption rate dramatically for some streams. This is transparent for the stream protocol, but requires a small tweak for the stream queue type implementation (passing in the previous iterator when creating a new one). The read ahead is on by default but can be deactivated with to the new stream.read_ahead configuration entry (true / false). References rabbitmq/osiris#192
1 parent 53f26c0 commit b68415a

File tree

6 files changed

+55
-14
lines changed

6 files changed

+55
-14
lines changed

deps/rabbit/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ define PROJECT_ENV
119119
{dead_letter_worker_consumer_prefetch, 32},
120120
{dead_letter_worker_publisher_confirm_timeout, 180000},
121121
{vhost_process_reconciliation_run_interval, 30},
122+
{stream_read_ahead, true},
122123
%% for testing
123124
{vhost_process_reconciliation_enabled, true},
124125
{license_line, "Licensed under the MPL 2.0. Website: https://rabbitmq.com"}

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2785,6 +2785,9 @@ fun(Conf) ->
27852785
end
27862786
end}.
27872787

2788+
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
2789+
[{datatype, {enum, [true, false]}}]}.
2790+
27882791
{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
27892792
{datatype, [binary]}
27902793
]}.

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
-export([format_osiris_event/2]).
5252
-export([update_stream_conf/2]).
5353
-export([readers/1]).
54+
-export([read_ahead_on/0]).
5455

5556
-export([parse_offset_arg/1,
5657
filter_spec/1]).
@@ -463,10 +464,11 @@ query_local_pid(#stream_client{stream_id = StreamId} = State) ->
463464
begin_stream(#stream_client{name = QName,
464465
readers = Readers0,
465466
local_pid = LocalPid} = State,
466-
Tag, Offset, Mode, AckRequired, Filter, Options)
467+
Tag, Offset, Mode, AckRequired, Filter, Options0)
467468
when is_pid(LocalPid) ->
468469
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
469-
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
470+
Options1 = Options0#{read_ahead => read_ahead_on()},
471+
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1),
470472
NextOffset = osiris_log:next_offset(Seg0) - 1,
471473
osiris:register_offset_listener(LocalPid, NextOffset),
472474
StartOffset = case Offset of
@@ -491,7 +493,7 @@ begin_stream(#stream_client{name = QName,
491493
last_consumed_offset = StartOffset,
492494
log = Seg0,
493495
filter = Filter,
494-
reader_options = Options},
496+
reader_options = Options1},
495497
{ok, State#stream_client{readers = Readers0#{Tag => Str0}}}.
496498

497499
cancel(_Q, #{consumer_tag := ConsumerTag,
@@ -659,8 +661,8 @@ handle_event(_QName, {stream_local_member_change, Pid},
659661
osiris_log:close(Log0),
660662
CounterSpec = {{?MODULE, QName, self()}, []},
661663
?LOG_DEBUG("Re-creating Osiris reader for consumer ~tp at offset ~tp "
662-
" with options ~tp",
663-
[T, Offset, Options]),
664+
" with options ~tp",
665+
[T, Offset, Options]),
664666
{ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec, Options),
665667
NextOffset = osiris_log:next_offset(Log1) - 1,
666668
?LOG_DEBUG("Registering offset listener at offset ~tp", [NextOffset]),
@@ -1176,7 +1178,7 @@ stream_entries(QName, Name, CTag, LocalPid,
11761178
credit = Credit} = Str0) ->
11771179
case Credit > 0 of
11781180
true ->
1179-
case chunk_iterator(Str0, LocalPid) of
1181+
case chunk_iterator(Str0, LocalPid, undefined) of
11801182
{ok, Str} ->
11811183
stream_entries(QName, Name, CTag, LocalPid, Str);
11821184
{end_of_stream, Str} ->
@@ -1229,7 +1231,7 @@ stream_entries(QName, Name, CTag, LocalPid,
12291231
gen_server:cast(self(), queue_event(QName, {resume_filtering, CTag})),
12301232
{Str0#stream{filtering_paused = true}, lists:reverse(Acc0)};
12311233
end_of_chunk ->
1232-
case chunk_iterator(Str0, LocalPid) of
1234+
case chunk_iterator(Str0, LocalPid, Iter0) of
12331235
{ok, Str} ->
12341236
stream_entries(QName, Name, CTag, LocalPid, Str, Acc0);
12351237
{end_of_stream, Str} ->
@@ -1294,8 +1296,8 @@ stream_entries(QName, Name, CTag, LocalPid,
12941296

12951297
chunk_iterator(#stream{credit = Credit,
12961298
listening_offset = LOffs,
1297-
log = Log0} = Str0, LocalPid) ->
1298-
case osiris_log:chunk_iterator(Log0, Credit) of
1299+
log = Log0} = Str0, LocalPid, PrevIterator) ->
1300+
case osiris_log:chunk_iterator(Log0, Credit, PrevIterator) of
12991301
{ok, _ChunkHeader, Iter, Log} ->
13001302
{ok, Str0#stream{chunk_iterator = Iter,
13011303
log = Log}};
@@ -1527,3 +1529,6 @@ queue_vm_stats_sups() ->
15271529
queue_vm_ets() ->
15281530
{[],
15291531
[]}.
1532+
1533+
read_ahead_on() ->
1534+
application:get_env(rabbit, stream_read_ahead, true).

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,28 @@ credential_validator.regexp = ^abc\\d+",
12231223
[{osiris, [
12241224
{port_range, {4100, 4600}}
12251225
]}],
1226+
[]},
1227+
1228+
%%
1229+
%% Stream read ahead on/off
1230+
%%
1231+
1232+
{stream_read_ahead,
1233+
"
1234+
stream.read_ahead = true
1235+
",
1236+
[{rabbit, [
1237+
{stream_read_ahead, true}
1238+
]}],
1239+
[]},
1240+
1241+
{stream_read_ahead,
1242+
"
1243+
stream.read_ahead = false
1244+
",
1245+
[{rabbit, [
1246+
{stream_read_ahead, false}
1247+
]}],
12261248
[]}
12271249

12281250
].

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2812,11 +2812,14 @@ init_reader(ConnectionTransport,
28122812
Properties,
28132813
OffsetSpec) ->
28142814
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
2815-
Options = maps:merge(#{transport => ConnectionTransport,
2816-
chunk_selector => get_chunk_selector(Properties)},
2817-
rabbit_stream_utils:filter_spec(Properties)),
2818-
{ok, Segment} =
2819-
osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options),
2815+
Options0 = #{transport => ConnectionTransport,
2816+
chunk_selector => get_chunk_selector(Properties),
2817+
read_ahead => rabbit_stream_queue:read_ahead_on()},
2818+
2819+
Options1 = maps:merge(Options0,
2820+
rabbit_stream_utils:filter_spec(Properties)),
2821+
{ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec,
2822+
CounterSpec, Options1),
28202823
?LOG_DEBUG("Next offset for subscription ~tp is ~tp",
28212824
[SubscriptionId, osiris_log:next_offset(Segment)]),
28222825
Segment.

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
-include_lib("eunit/include/eunit.hrl").
2121
-include_lib("rabbit_common/include/rabbit.hrl").
22+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
2223
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
2324
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
2425

@@ -1773,3 +1774,9 @@ request(CorrId, Cmd) ->
17731774

17741775
rand_bin() ->
17751776
base64:encode(rand:bytes(20)).
1777+
1778+
generate_log(MsgSize, MsgsPerChunk, NumMessages, Directory) ->
1779+
Body = binary:copy(<<"a">>, MsgSize),
1780+
Data = #'v1_0.data'{content = Body},
1781+
Bin = amqp10_framing:encode_bin(Data),
1782+
osiris_log:generate_log(Bin, MsgsPerChunk, NumMessages, Directory).

0 commit comments

Comments
 (0)