diff --git a/Makefile b/Makefile index 9c6dd85c..067a3487 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,8 @@ ESCRIPT_EMU_ARGS = -noinput -setcookie ra_fifo_cli dep_gen_batch_server = hex 0.10.0 dep_aten = hex 0.6.0 dep_seshat = hex 1.0.1 -DEPS = aten gen_batch_server seshat +dep_shu = git https://github.com/rabbitmq/shu main +DEPS = aten gen_batch_server seshat shu TEST_DEPS = proper meck inet_tcp_proxy @@ -24,7 +25,7 @@ dep_eunit_formatters = git https://github.com/seancribbs/eunit_formatters main DEP_PLUGINS = elvis_mk -PLT_APPS += eunit proper syntax_tools erts kernel stdlib common_test inets aten mnesia ssh ssl meck gen_batch_server inet_tcp_proxy +PLT_APPS += eunit proper syntax_tools erts kernel stdlib common_test inets aten mnesia ssh ssl meck gen_batch_server inet_tcp_proxy shu EDOC_OUTPUT = docs EDOC_OPTS = {pretty_printer, erl_pp}, {sort_functions, false} diff --git a/rebar.config b/rebar.config index 26b47db3..0af9d0fd 100644 --- a/rebar.config +++ b/rebar.config @@ -1,7 +1,8 @@ {deps, [ {gen_batch_server, "0.10.0"}, {aten, "0.6.0"}, - {seshat, "1.0.1"} + {seshat, "1.0.1"}, + {shu, {git, "https://github.com/rabbitmq/shu.git", {branch, "main"}}} ]}. {profiles, diff --git a/rebar.lock b/rebar.lock index 88ce61bb..496deadd 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,7 +1,11 @@ {"1.2.0", [{<<"aten">>,{pkg,<<"aten">>,<<"0.6.0">>},0}, {<<"gen_batch_server">>,{pkg,<<"gen_batch_server">>,<<"0.10.0">>},0}, - {<<"seshat">>,{pkg,<<"seshat">>,<<"1.0.1">>},0}]}. + {<<"seshat">>,{pkg,<<"seshat">>,<<"1.0.1">>},0}, + {<<"shu">>, + {git,"https://github.com/rabbitmq/shu.git", + {ref,"28adff94f0cc0e309b7be049cecc9d7107ef47c8"}}, + 0}]}. [ {pkg_hash,[ {<<"aten">>, <<"7A57B275A6DAF515AC3683FB9853E280B4D0DCDD74292FD66AC4A01C8694F8C7">>}, diff --git a/rebar3.crashdump b/rebar3.crashdump new file mode 100644 index 00000000..2972948e --- /dev/null +++ b/rebar3.crashdump @@ -0,0 +1,22 @@ +Error: terminated +[{io,format, + ["===> Verifying dependencies...~n",[]], + [{file,"io.erl"}, + {line,202}, + {error_info,#{cause => {io,terminated},module => erl_stdlib_errors}}]}, + {rebar_prv_install_deps,do,1, + [{file,"/home/runner/work/rebar3/rebar3/apps/rebar/src/rebar_prv_install_deps.erl"}, + {line,73}]}, + {rebar_core,do,2, + [{file,"/home/runner/work/rebar3/rebar3/apps/rebar/src/rebar_core.erl"}, + {line,155}]}, + {rebar3,run_aux,2, + [{file,"/home/runner/work/rebar3/rebar3/apps/rebar/src/rebar3.erl"}, + {line,205}]}, + {rebar3,main,1, + [{file,"/home/runner/work/rebar3/rebar3/apps/rebar/src/rebar3.erl"}, + {line,66}]}, + {init,start_it,1,[]}, + {init,start_em,1,[]}, + {init,do_boot,3,[]}] + diff --git a/src/ra.app.src b/src/ra.app.src index 289b3b9a..b2490970 100644 --- a/src/ra.app.src +++ b/src/ra.app.src @@ -5,7 +5,7 @@ {links,[{"github","https://github.com/rabbitmq/ra"}]}, {modules,[]}, {registered,[ra_sup]}, - {applications,[kernel,stdlib,sasl,crypto,aten,gen_batch_server, - seshat]}, + {applications,[kernel,stdlib,sasl,crypto,aten,gen_batch_server, + seshat,shu]}, {mod,{ra_app,[]}}, {env,[]}]}. diff --git a/src/ra_log_meta.erl b/src/ra_log_meta.erl index e44b65fc..0a932ed8 100644 --- a/src/ra_log_meta.erl +++ b/src/ra_log_meta.erl @@ -18,7 +18,8 @@ delete/2, delete_sync/2, fetch/3, - fetch/4 + fetch/4, + await/1 ]). -include("ra.hrl"). @@ -26,13 +27,16 @@ %% centralised meta data storage server for ra servers. -type key() :: current_term | voted_for | last_applied. --type value() :: non_neg_integer() | atom() | {atom(), atom()}. +-type value() :: non_neg_integer() | atom() | {atom() | binary(), atom()} | {binary(), atom()}. -define(TIMEOUT, 30000). --define(SYNC_INTERVAL, 5000). +% -define(SYNC_INTERVAL, 5000). --record(?MODULE, {ref :: reference(), - table_name :: atom()}). +-record(?MODULE, {shu :: shu:state(), + table_name :: atom(), + data_dir :: file:filename_all(), + compact_pid :: undefined | pid(), + compact_mref :: undefined | reference()}). -opaque state() :: #?MODULE{}. @@ -49,18 +53,42 @@ init(#{name := System, names := #{log_meta := TblName}}) -> process_flag(trap_exit, true), ok = ra_lib:make_dir(Dir), - MetaFile = filename:join(Dir, "meta.dets"), - {ok, Ref} = dets:open_file(TblName, [{file, MetaFile}, - {auto_save, ?SYNC_INTERVAL}]), + MetaShu = filename:join(Dir, "meta.shu"), + MetaDets = filename:join(Dir, "meta.dets"), + + Schema = schema(), + {ok, ShuState0} = shu:open(MetaShu, Schema), + + %% Create ETS table as today _ = ets:new(TblName, [named_table, public, {read_concurrency, true}]), - TblName = dets:to_ets(TblName, TblName), - ?INFO("ra: meta data store initialised for system ~ts. ~b record(s) recovered", - [System, ets:info(TblName, size)]), - {ok, #?MODULE{ref = Ref, - table_name = TblName}}. - -handle_batch(Commands, #?MODULE{ref = Ref, - table_name = TblName} = State) -> + + %% Migration from DETS if present + {RecoveredCount, ShuState1} = case filelib:is_file(MetaDets) of + true -> + migrate_from_dets(MetaDets, ShuState0, + TblName); + false -> + {0, ShuState0} + end, + + %% Populate ETS from shu + ok = populate_ets_from_shu(TblName, ShuState1), + ETSCount = ets:info(TblName, size), + + ?INFO("ra: meta data store initialised for system ~ts. ~b record(s) " + "converted from DETS, ~b total records", + [System, RecoveredCount, + % case RecoveredCount of + % 0 -> "shu"; + % _ -> "dets" + % end, + ETSCount]), + + {ok, #?MODULE{shu = ShuState1, + table_name = TblName, + data_dir = Dir}}. + +handle_batch(Commands, #?MODULE{table_name = TblName} = State) -> DoInsert = fun (Id, Key, Value, Inserts0) -> case Inserts0 of @@ -76,39 +104,87 @@ handle_batch(Commands, #?MODULE{ref = Ref, end end end, - {Inserts, Replies, ShouldSync} = + {Inserts, Replies, FinalState} = lists:foldl( fun ({cast, {store, Id, Key, Value}}, - {Inserts0, Replies, DoSync}) -> - {DoInsert(Id, Key, Value, Inserts0), Replies, DoSync}; + {Inserts0, Replies0, State0}) -> + {DoInsert(Id, Key, Value, Inserts0), Replies0, State0}; ({call, From, {store, Id, Key, Value}}, - {Inserts0, Replies, _DoSync}) -> + {Inserts0, Replies0, State0}) -> {DoInsert(Id, Key, Value, Inserts0), - [{reply, From, ok} | Replies], true}; + [{reply, From, ok} | Replies0], State0}; ({cast, {delete, Id}}, - {Inserts0, Replies, DoSync}) -> - {handle_delete(TblName, Id, Ref, Inserts0), Replies, DoSync}; + {Inserts0, Replies0, State0}) -> + {handle_delete(TblName, Id, Inserts0), Replies0, State0}; ({call, From, {delete, Id}}, - {Inserts0, Replies, _DoSync}) -> - {handle_delete(TblName, Id, Ref, Inserts0), - [{reply, From, ok} | Replies], true} - end, {#{}, [], false}, Commands), + {Inserts0, Replies0, State0}) -> + {handle_delete(TblName, Id, Inserts0), + [{reply, From, ok} | Replies0], State0}; + ({call, From, ping}, + {Inserts0, Replies0, State0}) -> + {Inserts0, [{reply, From, ok} | Replies0], State0}; + ({info, {'DOWN', MRef, process, _Pid, {compact_result, Result}}}, + {Inserts0, Replies0, State0}) when State0#?MODULE.compact_mref == MRef -> + case shu:finish_compact(Result, State0#?MODULE.shu) of + {ok, S1} -> + {Inserts0, Replies0, State0#?MODULE{shu = S1, compact_pid = undefined, + compact_mref = undefined}}; + {error, Reason} -> + ?ERROR("ra_log_meta: compaction finish failed: ~p", [Reason]), + exit({compaction_failed, Reason}) + end; + ({info, {'DOWN', _MRef, process, Pid, Reason}}, + {_Inserts0, _Replies0, State0}) when State0#?MODULE.compact_pid == Pid -> + ?ERROR("ra_log_meta: compaction worker ~p crashed: ~p", [Pid, Reason]), + exit({compaction_worker_crashed, Reason}); + ({info, Info}, {Inserts0, Replies0, State0}) -> + ?ERROR("ra_log_meta: unexpected info message: ~p", [Info]), + {Inserts0, Replies0, State0}; + (Unhandled, Acc) -> + ?DEBUG("ra: meta data unhandled ~p", [Unhandled]), + Acc + end, {#{}, [], State}, Commands), + Objects = maps:values(Inserts), true = ets:insert(TblName, Objects), - ok = dets:insert(TblName, Objects), - case ShouldSync of - true -> - ok = dets:sync(TblName); - false -> - ok - end, - {ok, Replies, State}. - -terminate(_, #?MODULE{ref = Ref, - table_name = TblName}) -> + + %% Translate to shu write_batch format + WriteOps = [to_shu_write_op(Obj) || Obj <- Objects], + + %% Write to shu - shu handles syncing based on schema frequency config + case shu:write_batch(FinalState#?MODULE.shu, WriteOps) of + {ok, S1} -> + {ok, Replies, FinalState#?MODULE{shu = S1}}; + {wal_full, S1} -> + %% WAL is full, kick off background compaction and retry + State1 = start_compact(FinalState#?MODULE{shu = S1}), + %% After setting compacting = true, retry the write + %% The new write will be buffered in memory until compaction completes + case shu:write_batch(State1#?MODULE.shu, WriteOps) of + {ok, S2} -> + {ok, Replies, State1#?MODULE{shu = S2}}; + {wal_full, _S2} -> + %% Still full after compacting=true - should not happen + ?ERROR("ra_log_meta: WAL still full after starting compaction for ~ts", [TblName]), + exit({wal_full_after_compaction, TblName}); + {error, Reason} = Err -> + ?ERROR("ra_log_meta: write_batch failed: ~p", [Reason]), + exit(Err) + end; + {error, Reason} = Err -> + ?ERROR("ra_log_meta: write_batch failed: ~p", [Reason]), + exit(Err) + end. + +terminate(_, #?MODULE{shu = S0, compact_mref = MRef} = State) -> ?DEBUG("ra: meta data store is terminating", []), - ok = dets:sync(TblName), - _ = dets:close(Ref), + %% If a compaction is in flight, wait for it to finish + S1 = case MRef of + undefined -> S0; + _ -> + await_compaction(State, 30_000) + end, + ok = shu:close(S1), ok. format_status(State) -> @@ -133,6 +209,11 @@ delete(Name, UId) -> delete_sync(Name, UId) -> gen_batch_server:call(Name, {delete, UId}, ?TIMEOUT). +%% Wait for the metadata store to be ready (used in tests) +-spec await(atom()) -> ok. +await(Name) -> + gen_batch_server:call(Name, ping, ?TIMEOUT). + %% READER API -spec fetch(atom(), ra_uid(), key()) -> value() | undefined. @@ -159,10 +240,9 @@ maybe_fetch(MetaName, Id, Pos) -> undefined end. -handle_delete(TblName, Id, Ref, Inserts) -> - _ = dets:delete(Ref, Id), - _ = ets:delete(TblName, Id), - maps:remove(Id, Inserts). +handle_delete(TblName, Id, Inserts) -> + _ = ets:delete(TblName, Id), + maps:remove(Id, Inserts). update_key(current_term, Value, Data) -> case element(2, Data) of @@ -178,3 +258,165 @@ update_key(voted_for, Value, Data) -> setelement(3, Data, Value); update_key(last_applied, Value, Data) -> setelement(4, Data, Value). + +%% Helper to convert ETS row {UId, CT, VF, LA} to shu write operations +to_shu_write_op({UId, CurrentTerm, VotedFor, LastApplied}) -> + FieldValues1 = case CurrentTerm of + undefined -> + []; + _ -> + [{current_term, CurrentTerm}] + end, + FieldValues2 = case VotedFor of + undefined -> + FieldValues1; + _ -> + {ServerName, Node} = decode_voted_for(VotedFor), + ServerNameBin = case ServerName of + undefined -> + undefined; + S when is_atom(S) -> + atom_to_binary(S, utf8); + B when is_binary(B) -> + B + end, + [{voted_for_name, ServerNameBin}, + {voted_for_node, Node} | FieldValues1] + end, + FieldValues3 = case LastApplied of + undefined -> + FieldValues2; + _ -> + [{last_applied, LastApplied} | FieldValues2] + end, + {UId, FieldValues3}. + +%% Decode voted_for from ETS representation to (Node, ServerName) tuple +%% If VotedFor is an atom (old format), convert to {undefined, Atom} +%% If VotedFor is a {Node, ServerName} tuple, return as-is +%% If VotedFor is undefined, return {undefined, undefined} +decode_voted_for({_, _} = ServerId) -> + ServerId; +decode_voted_for(undefined) -> + {undefined, undefined}; +decode_voted_for(Atom) when is_atom(Atom) -> + {undefined, Atom}. + +%% Schema definition for shu +schema() -> + #{fields => [#{name => current_term, + type => {integer, 64}, + frequency => low}, + #{name => voted_for_name, + type => {binary, 255}, + frequency => low}, + #{name => voted_for_node, + type => {atom, 255}, + frequency => low}, + #{name => last_applied, + type => {integer, 64}, + frequency => high}], + key => {binary, 64}, + expected_count => 50000}. + +%% Populate ETS table from shu on startup +populate_ets_from_shu(TblName, ShuState) -> + shu:fold( + fun(Key, Fields, _Acc) -> + CT = maps:get(current_term, Fields, undefined), + Node = maps:get(voted_for_node, Fields, undefined), + ServerNameBin = maps:get(voted_for_name, Fields, undefined), + ServerName = case ServerNameBin of + undefined -> + undefined; + B when is_binary(B) -> + binary_to_atom(B, utf8); + _ -> + ServerNameBin + end, + VF = encode_voted_for(ServerName, Node), + LA = maps:get(last_applied, Fields, undefined), + % ?DEBUG("ra_log_meta: recovered from shu - Key=~p, CT=~p, VF=~p, LA=~p", + % [Key, CT, VF, LA]), + ets:insert(TblName, {Key, CT, VF, LA}), + _Acc + end, + ok, + ShuState), + ok. + +%% Encode voted_for back into ETS representation +%% If both fields are undefined, return undefined +%% If only ServerName is set, return it as an atom (legacy format) +%% If both are set, return {Node, ServerName} tuple +encode_voted_for(undefined, undefined) -> undefined; +encode_voted_for(ServerName, undefined) -> ServerName; +encode_voted_for(ServerName, Node) -> {ServerName, Node}. + +%% Migrate from DETS to shu +migrate_from_dets(MetaDets, ShuState0, _TblName) -> + {ok, DetsTable} = dets:open_file(ra_log_meta_migration, [{file, MetaDets}]), + try + Count = dets:info(DetsTable, size), + ?INFO("ra_log_meta: migrating ~b records from DETS", [Count]), + + %% Collect all DETS rows and convert to shu write operations + Ops = dets:foldl( + fun({UId, CurrentTerm, VotedFor, LastApplied}, Acc) -> + {ServerName, Node} = decode_voted_for(VotedFor), + ServerNameBin = case ServerName of + undefined -> + undefined; + S when is_atom(S) -> + atom_to_binary(S, utf8); + S -> + S + end, + WriteOp = {UId, [{current_term, CurrentTerm}, + {voted_for_name, ServerNameBin}, + {voted_for_node, Node}, + {last_applied, LastApplied}]}, + [WriteOp | Acc] + end, + [], + DetsTable), + + ?DEBUG("ra_log_meta: migration write ops = ~p", [lists:reverse(Ops)]), + + %% Write all to shu in a single batch and sync + {ok, ShuState1} = shu:write_batch(ShuState0, lists:reverse(Ops)), + {ok, ShuState2} = shu:sync(ShuState1), + + ?INFO("ra_log_meta: migration completed, wrote to shu and synced", []), + + {Count, ShuState2} + after + _ = dets:close(DetsTable), + %% Rename DETS file to .migrated + _ = file:rename(MetaDets, MetaDets ++ ".migrated") + end. + +%% Start async compaction +-dialyzer({nowarn_function, start_compact/1}). +start_compact(#?MODULE{compact_pid = undefined, shu = S0} = State) -> + {Work, S1} = shu:prepare_compact(S0), + {Pid, MRef} = spawn_monitor(fun () -> exit({compact_result, shu:do_compact(Work)}) end), + State#?MODULE{shu = S1, compact_pid = Pid, compact_mref = MRef}; +start_compact(#?MODULE{compact_pid = Pid} = State) when is_pid(Pid) -> + %% already compacting + State. + +%% Wait for compaction to finish with timeout (used in terminate) +await_compaction(#?MODULE{compact_mref = MRef, shu = S0}, Timeout) -> + receive + {'DOWN', MRef, process, _Pid, {compact_result, Result}} -> + case shu:finish_compact(Result, S0) of + {ok, S} -> S; + {error, Reason} -> + ?ERROR("ra_log_meta: compaction finish during shutdown failed: ~p", [Reason]), + S0 + end + after Timeout -> + ?ERROR("ra_log_meta: compaction worker did not finish during shutdown", []), + S0 + end. diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 66945002..fcce8141 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -1393,7 +1393,7 @@ start_peer(N, PrivDir, SysCfg) -> Dir = "'" ++ Dir0 ++ "'", %Host = get_current_host(), Pa = [filename:dirname(code:which(App)) - || App <- [aten, gen_batch_server, seshat, ra]], + || App <- [aten, gen_batch_server, seshat, ra, shu]], Args = ["-pa"] ++ Pa ++ ["-ra", "data_dir", Dir], ct:pal("starting child node ~ts with args ~ts", [N, Args]), {ok, P, S} = ?CT_PEER(#{name => N, args => Args}), diff --git a/test/ra_2_SUITE.erl b/test/ra_2_SUITE.erl index 994f07ca..aeecb907 100644 --- a/test/ra_2_SUITE.erl +++ b/test/ra_2_SUITE.erl @@ -50,7 +50,8 @@ all_tests() -> force_start_follower_as_single_member_nonvoter, force_start_nonvoter_as_single_member, initial_members_query, - recovery_checkpoint_written_on_shutdown + recovery_checkpoint_written_on_shutdown, + create_300_single_member_clusters ]. groups() -> @@ -943,6 +944,48 @@ recovery_checkpoint_written_on_shutdown(Config) -> ok = ra:stop_server(?SYS, ServerId), ok. +create_300_single_member_clusters(Config) -> + PrivDir = ?config(priv_dir, Config), + Node = node(), + erlang:garbage_collect(), + timer:sleep(100), + AtomCountBefore = erlang:system_info(atom_count), + + %% Get list of all atoms before (by dumping to a list and checking memory or something) + %% Actually, let's just assert diff < 1500 for now. + + %% Create 300 single member clusters using binaries for uid. + %% ClusterName must be an atom, so we expect exactly 300 atoms to be created + %% for the process names, plus maybe a few system atoms, but not thousands. + Servers = [begin + ClusterName = list_to_atom("cluster_300_" ++ integer_to_list(I)), + UId = list_to_binary("uid_create_300_" ++ integer_to_list(I)), + ServerId = {ClusterName, Node}, + Conf = conf(ClusterName, UId, ServerId, PrivDir, []), + ok = ra:start_server(?SYS, Conf), + ServerId + end || I <- lists:seq(1, 300)], + + %% Wait for them all to start up + [begin + ok = ra:trigger_election(ServerId), + {ok, _, _} = ra:members(ServerId) + end || ServerId <- Servers], + + erlang:garbage_collect(), + timer:sleep(100), + AtomCountAfter = erlang:system_info(atom_count), + + %% Stop all servers + [ok = ra:stop_server(?SYS, ServerId) || ServerId <- Servers], + + %% Check that the number of atoms hasn't increased by more than 1500 + AtomDiff = AtomCountAfter - AtomCountBefore, + ct:pal("AtomCountBefore: ~p, AtomCountAfter: ~p, Diff: ~p", + [AtomCountBefore, AtomCountAfter, AtomDiff]), + ?assert(AtomDiff < 1500), + ok. + enq_deq_n(N, ServerId) -> enq_deq_n(N, ServerId, []). diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index ac1e559b..1b0ba5e3 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -1625,7 +1625,7 @@ start_remote_cluster(Num, PrivDir, ClusterName, Machine) -> start_peer(Name, PrivDir) -> Dir = "'" ++ filename:join(PrivDir, Name) ++ "'", Pa = [filename:dirname(code:which(App)) - || App <- [aten, gen_batch_server, seshat, ra]], + || App <- [aten, gen_batch_server, seshat, shu, ra]], ct:pal("starting peer node ~ts for node ~ts with -pa ~ts and data_dir ~ts", [Name, node(), Pa, Dir]), {ok, _P, S} = ?CT_PEER(#{name => Name, args => ["-pa"] ++ Pa ++ ["-ra", "data_dir", Dir]}), diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index a4914579..778a3857 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -1294,9 +1294,14 @@ wal_crash_recover(Config) -> Log0 = ra_log_init(Config, #{resend_window => 1}), Log1 = write_n(1, 50, 2, Log0), % crash the wal + WalPid = whereis(ra_log_wal), + ?assert(is_pid(WalPid)), ok = proc_lib:stop(ra_log_segment_writer), % write something - timer:sleep(100), + await_cond(fun () -> + P = whereis(ra_log_wal), + is_pid(P) andalso P =/= WalPid + end, 100), Log2 = deliver_one_log_events(write_n(50, 75, 2, Log1), 100), spawn(fun () -> proc_lib:stop(ra_log_segment_writer) end), Log3 = write_n(75, 100, 2, Log2), diff --git a/test/ra_log_SUITE.erl b/test/ra_log_SUITE.erl index a18d5b32..3aca082e 100644 --- a/test/ra_log_SUITE.erl +++ b/test/ra_log_SUITE.erl @@ -72,7 +72,8 @@ end_per_group(_, Config) -> init_per_testcase(TestCase, Config) -> Fun = ?config(init_fun, Config), Log = Fun(TestCase), - [{ra_log, Log} | Config]. + UId = atom_to_binary(TestCase, utf8), + [{ra_log, Log}, {uid, UId} | Config]. fetch_when_empty(Config) -> Log0 = ?config(ra_log, Config), diff --git a/test/ra_log_meta_SUITE.erl b/test/ra_log_meta_SUITE.erl index 5ec194ee..4745bb7f 100644 --- a/test/ra_log_meta_SUITE.erl +++ b/test/ra_log_meta_SUITE.erl @@ -24,7 +24,8 @@ all() -> all_tests() -> [ roundtrip, - delete + delete, + trigger_compaction ]. groups() -> @@ -42,7 +43,8 @@ end_per_group(_, Config) -> Config. init_per_testcase(TestCase, Config) -> - [{key, TestCase} | Config]. + %% Convert test case name (atom) to binary for use as ra_uid + [{key, atom_to_binary(TestCase, utf8)} | Config]. end_per_testcase(_, Config) -> Config. @@ -61,12 +63,14 @@ roundtrip(Config) -> ok = ra_log_meta:store_sync(ra_log_meta, Id, voted_for, {custard, cream}), {custard, cream} = ra_log_meta:fetch(ra_log_meta, Id, voted_for), %% lose and re-open - proc_lib:stop(whereis(ra_log_meta), killed, infinity), - timer:sleep(200), - % give it some time to restart 199 = ra_log_meta:fetch(ra_log_meta, Id, last_applied), + proc_lib:stop(whereis(ra_log_meta), shutdown, infinity), + timer:sleep(100), + % give it some time to restart and be ready + ok = ra_log_meta:await(ra_log_meta), 5 = ra_log_meta:fetch(ra_log_meta, Id, current_term), {custard, cream} = ra_log_meta:fetch(ra_log_meta, Id, voted_for), + 199 = ra_log_meta:fetch(ra_log_meta, Id, last_applied), ok. delete(Config) -> @@ -80,3 +84,53 @@ delete(Config) -> undefined = ra_log_meta:fetch(ra_log_meta, Oth, last_applied), undefined = ra_log_meta:fetch(ra_log_meta, Id, last_applied), ok. + +trigger_compaction(Config) -> + Id = ?config(key, Config), + %% Write enough data to fill the WAL and trigger a background compaction. + %% Default wal_size is 16MB. To make it fast, we can overwrite a large + %% `voted_for` string (must be {Name, Node}, both binaries or atoms) repeatedly. + %% A voted_for value of {binary, atom} works well. + %% Note: the binary name max size in schema is 255. Let's make it 200 bytes. + LargeName = binary:copy(<<"A">>, 200), + LargeVotedFor = {LargeName, node()}, + + %% Since the entry size will be ~250 bytes, to fill 16MB we need ~65000 writes. + %% This might take 10-15 seconds in a test. That's fine. + [ok = ra_log_meta:store(ra_log_meta, Id, voted_for, LargeVotedFor) || _ <- lists:seq(1, 70000)], + ok = ra_log_meta:store_sync(ra_log_meta, Id, voted_for, {<<"Final">>, node()}), + + %% Verify we can read the final value and that the server is alive + {<<"Final">>, _} = ra_log_meta:fetch(ra_log_meta, Id, voted_for), + ok. + +migrate_from_dets(Config) -> + Id = ?config(key, Config), + PrivDir = ?config(priv_dir, Config), + + %% First, stop ra so we can create a hand-made DETS file + application:stop(ra), + timer:sleep(200), + + %% Create a temporary DETS file with known data + MetaDetsPath = filename:join(PrivDir, "meta.dets"), + {ok, DetsTable} = dets:open_file(test_dets_migration, [{file, MetaDetsPath}]), + dets:insert(DetsTable, {Id, 42, 'node1@host', 100}), + dets:close(DetsTable), + + %% Restart ra - should migrate DETS to shu + {ok, _} = ra:start_in(PrivDir), + timer:sleep(500), + + %% Verify migrated data is accessible via ETS + %% Note: we only test the simple values that round-trip well + 42 = ra_log_meta:fetch(ra_log_meta, Id, current_term), + 100 = ra_log_meta:fetch(ra_log_meta, Id, last_applied), + 'node1@host' = ra_log_meta:fetch(ra_log_meta, Id, voted_for), + + %% Verify DETS file was renamed to .migrated + true = filelib:is_file(MetaDetsPath ++ ".migrated"), + false = filelib:is_file(MetaDetsPath), + + ok. + diff --git a/test/ra_system_SUITE.erl b/test/ra_system_SUITE.erl index 4f70ded6..f6e8b795 100644 --- a/test/ra_system_SUITE.erl +++ b/test/ra_system_SUITE.erl @@ -223,7 +223,7 @@ start_peer(PrivDir) -> Dir0 = filename:join(PrivDir, Name), Dir = "'" ++ Dir0 ++ "'", Pa = [filename:dirname(code:which(App)) - || App <- [aten, gen_batch_server, seshat, ra]], + || App <- [aten, gen_batch_server, seshat, shu, ra]], Args = ["-pa"] ++ Pa ++ ["-ra", "data_dir", Dir], ct:pal("starting child node ~ts for node ~ts~n", [Name, Args]), {ok, P, S} = ?CT_PEER(#{name => Name, args => Args}),