Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 48 additions & 20 deletions doc/unsplit.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

Framework for merging mnesia tables after netsplit.



__Behaviours:__ [`application`](application.md), [`supervisor`](supervisor.md).

__Authors:__ : Ulf Wiger ([`ulf.wiger@erlang-solutions.com`](mailto:ulf.wiger@erlang-solutions.com)).<a name="description"></a>
Expand All @@ -25,38 +23,80 @@ __Authors:__ : Ulf Wiger ([`ulf.wiger@erlang-solutions.com`](mailto:ulf.wiger@er
##Function Index##


<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#get_reporter-0">get_reporter/0</a></td><td>Look up the predefined callback module for reporting inconsistencies.</td></tr><tr><td valign="top"><a href="#report_inconsistency-4">report_inconsistency/4</a></td><td>Report an inconcistency to the predefined reporter.</td></tr><tr><td valign="top"><a href="#report_inconsistency-5">report_inconsistency/5</a></td><td>Report an inconsistency to Reporter (an unsplit_reporter behaviour).</td></tr><tr><td valign="top"><a href="#start-2">start/2</a></td><td>Application start callback.</td></tr><tr><td valign="top"><a href="#stop-1">stop/1</a></td><td>Application stop callback.</td></tr></table>
<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#default_log_fun-2">default_log_fun/2</a></td><td>Default implementation of an unsplit log function.</td></tr><tr><td valign="top"><a href="#get_logger-0">get_logger/0</a></td><td>Look up the predefined callback function for logging.</td></tr><tr><td valign="top"><a href="#get_reporter-0">get_reporter/0</a></td><td>Look up the predefined callback module for reporting inconsistencies.</td></tr><tr><td valign="top"><a href="#log_write-2">log_write/2</a></td><td>Writes a log message to the predefined logger.</td></tr><tr><td valign="top"><a href="#log_write-3">log_write/3</a></td><td>Writes a formatted log message to the predefined logger.</td></tr><tr><td valign="top"><a href="#log_write-4">log_write/4</a></td><td>Writes a formatted log message to the predefined logger.</td></tr><tr><td valign="top"><a href="#report_inconsistency-4">report_inconsistency/4</a></td><td>Report an inconcistency to the predefined reporter.</td></tr><tr><td valign="top"><a href="#report_inconsistency-5">report_inconsistency/5</a></td><td>Report an inconsistency to Reporter (an unsplit_reporter behaviour).</td></tr><tr><td valign="top"><a href="#start-2">start/2</a></td><td>Application start callback.</td></tr><tr><td valign="top"><a href="#stop-1">stop/1</a></td><td>Application stop callback.</td></tr></table>


<a name="functions"></a>

##Function Details##

<a name="default_log_fun-2"></a>

###default_log_fun/2##


<pre>default_log_fun(LogType::<a href="unsplit.md#type-log_type">unsplit:log_type()</a>, Message::string()) -> ok</pre>
<br></br>


Default implementation of an unsplit log function
<a name="get_logger-0"></a>

###get_logger/0##


<pre>get_logger() -> <a href="unsplit.md#type-log_fun">unsplit:log_fun()</a></pre>
<br></br>


Look up the predefined callback function for logging
<a name="get_reporter-0"></a>

###get_reporter/0##


<pre>get_reporter() -&gt; module()</pre>
<br></br>


<pre>get_reporter() -&gt; module()</pre>
Look up the predefined callback module for reporting inconsistencies
<a name="log_write-2"></a>

###log_write/2##


<pre>log_write(LogType::<a href="unsplit.md#type-log_type">unsplit:log_type()</a>, Message::string()) -> ok</pre>
<br></br>


Writes a log message to the predefined logger
<a name="log_write-3"></a>

###log_write/3##

Look up the predefined callback module for reporting inconsistencies
<a name="report_inconsistency-4"></a>

###report_inconsistency/4##
<pre>log_write(LogType::<a href="unsplit.md#type-log_type">unsplit:log_type()</a>, Format::string(), Data::[any()]) -> ok</pre>
<br></br>


Writes a formatted log message to the predefined logger
<a name="log_write-4"></a>

###log_write/4##

<pre>report_inconsistency(Tab::Table, Key, ObjA::ObjectA, ObjB::ObjectB) -&gt; ok</pre>

<pre>log_write(Logger::<a href="unsplit.md#type-log_fun">unsplit:log_fun()</a>, LogType::<a href="unsplit.md#type-log_type">unsplit:log_type()</a>, Format::string(), Data::[any()]) -> ok</pre>
<br></br>


Writes a formatted log message to the predefined logger
<a name="report_inconsistency-4"></a>

###report_inconsistency/4##


<pre>report_inconsistency(Tab::Table, Key, ObjA::ObjectA, ObjB::ObjectB) -&gt; ok</pre>
<br></br>


Report an inconcistency to the predefined reporter
Expand All @@ -65,40 +105,28 @@ Report an inconcistency to the predefined reporter
###report_inconsistency/5##




<pre>report_inconsistency(Reporter, Tab::Table, Key, ObjA::ObjectA, ObjB::ObjectB) -&gt; ok</pre>
<br></br>




Report an inconsistency to Reporter (an unsplit_reporter behaviour)
<a name="start-2"></a>

###start/2##




<pre>start(X1::Type, X2::Arg) -&gt; {ok, pid()}</pre>
<br></br>




Application start callback
<a name="stop-1"></a>

###stop/1##




<pre>stop(X1::State) -&gt; ok</pre>
<br></br>




Application stop callback
5 changes: 5 additions & 0 deletions include/unsplit.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@
| {ok, any()}
| {ok, merge_actions(), any()}
| {ok, merge_actions(), merge_strategy(), any()}.

-type log_fun() :: fun((LogType :: log_type(), Message :: string()) -> 'ok').

-type log_type() :: 'error' | 'normal'.

3 changes: 2 additions & 1 deletion src/unsplit.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
{registered, []},
{mod, {unsplit, []}},
{env, [
{reporter, unsplit_reporter}
{reporter, unsplit_reporter},
{log_fun, {unsplit, default_log_fun}}
]}
]}.
38 changes: 38 additions & 0 deletions src/unsplit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
-export([get_reporter/0,
report_inconsistency/4, report_inconsistency/5]).

-export([get_logger/0,
default_log_fun/2,
log_write/2, log_write/3, log_write/4]).


%% application start/stop API
-export([start/2, stop/1]).
Expand All @@ -60,6 +64,40 @@ report_inconsistency(Reporter, Tab, Key, ObjA, ObjB) ->
Reporter:inconsistency(Tab, Key, ObjA, ObjB).


%% @spec get_logger() -> unsplit:log_fun()
%% @doc Look up the predefined callback function for logging
%%
get_logger() ->
{ok, {Module, FunctionName}} = application:get_env(unsplit, log_fun),
fun(LogType, Message) -> Module:FunctionName(LogType, Message) end.

%% @spec default_log_fun(LogType :: unsplit:log_type(), Message :: string()) -> 'ok'
%% @doc Default implementation of an unsplit log function
%%
default_log_fun(normal, Message) ->
io:fwrite(Message);
default_log_fun(error, Message) ->
error_logger:format(Message, []).

%% @spec log_write(LogType :: unsplit:log_type(), Message :: string()) -> 'ok'
%% @doc Writes a log message to the predefined logger
%%
log_write(LogType, Message) ->
log_write(LogType, Message, []).

%% @spec log_write(LogType :: unsplit:log_type(), Format :: string(), Data :: [any()]) -> 'ok'
%% @doc Writes a formatted log message to the predefined logger
%%
log_write(LogType, Format, Data) ->
log_write(get_logger(), LogType, Format, Data).

%% @spec log_write(Logger :: unsplit:log_fun(), LogType :: unsplit:log_type(), Format :: string(), Data :: [any()]) -> 'ok'
%% @doc Writes a formatted log message to the predefined logger
%%
log_write(Logger, LogType, Format, Data) ->
Logger(LogType, io_lib:format(Format, Data)).


%% @spec start(Type, Arg) -> {ok, pid()}
%% @doc Application start callback
%%
Expand Down
18 changes: 9 additions & 9 deletions src/unsplit_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
%% @end
%%
no_action(init, [Tab|_]) ->
error_logger:format("Will not merge table ~p~n", [Tab]),
unsplit:log_write(error, "Will not merge table ~p~n", [Tab]),
stop.

%% @spec last_modified(Phase, State) -> merge_ret()
Expand Down Expand Up @@ -81,11 +81,11 @@ bag(Objs, S) ->
last_version(init, [Tab, Attrs, Attr]) ->
case lists:member(Attr, Attrs) of
false ->
error_logger:format("Cannot merge table ~p."
"Missing ~p attribute~n", [Tab, Attr]),
unsplit:log_write(error, "Cannot merge table ~p."
"Missing ~p attribute~n", [Tab, Attr]),
stop;
true ->
io:fwrite("Starting merge of ~p (~p)~n", [Tab, Attrs]),
unsplit:log_write(normal, "Starting merge of ~p (~p)~n", [Tab, Attrs]),
{ok, {Tab, pos(Attr, Tab, Attrs)}}
end;
last_version(done, _S) ->
Expand All @@ -99,11 +99,11 @@ last_version(Objs, {T, P} = S) when is_list(Objs) ->
vclock(init, [Tab, Attrs, Attr]) ->
case lists:member(Attr, Attrs) of
false ->
error_logger:format("Cannot merge table ~p."
"Missing ~p attribute~n", [Tab, Attr]),
unsplit:log_write(error, "Cannot merge table ~p."
"Missing ~p attribute~n", [Tab, Attr]),
stop;
true ->
io:fwrite("Starting merge of ~p (~p)~n", [Tab, Attrs]),
unsplit:log_write(normal, "Starting merge of ~p (~p)~n", [Tab, Attrs]),
{ok, {Tab, pos(Attr, Tab, Attrs)}}
end;
vclock(done, _) ->
Expand All @@ -127,14 +127,14 @@ vclock(Objs, {T, P} = S) ->
{ok, Actions, same, S}.

last_version_entry(Obj, T, P) ->
io:fwrite("last_version_entry(~p)~n", [Obj]),
unsplit:log_write(normal, "last_version_entry(~p)~n", [Obj]),
compare(Obj, T, P, fun(A, B) when A < B -> left;
(A, B) when A > B -> right;
(_, _) -> neither
end).

compare(Obj, T, P, Comp) ->
io:fwrite("compare(~p)~n", [Obj]),
unsplit:log_write("compare(~p)~n", [Obj]),
case Obj of
{A, []} -> {write, A};
{[], B} -> {write, B};
Expand Down
30 changes: 15 additions & 15 deletions src/unsplit_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,17 @@ handle_cast(_Msg, State) ->
%%--------------------------------------------------------------------
handle_info({mnesia_system_event,
{inconsistent_database, Context, Node}}, State) ->
io:fwrite("inconsistency. Context = ~p; Node = ~p~n", [Context, Node]),
unsplit:log_write(normal, "inconsistency. Context = ~p; Node = ~p~n", [Context, Node]),
Res = global:trans(
{?LOCK, self()},
fun() ->
io:fwrite("have lock...~n", []),
unsplit:log_write(normal, "have lock...~n", []),
stitch_together(node(), Node)
end),
io:fwrite("Res = ~p~n", [Res]),
unsplit:log_write(normal, "Res = ~p~n", [Res]),
{noreply, State};
handle_info(_Info, State) ->
io:fwrite("Got event: ~p~n", [_Info]),
unsplit:log_write(normal, "Got event: ~p~n", [_Info]),
{noreply, State}.

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -142,7 +142,7 @@ code_change(_OldVsn, State, _Extra) ->
stitch_together(NodeA, NodeB) ->
case lists:member(NodeB, mnesia:system_info(running_db_nodes)) of
true ->
io:fwrite("~p already stitched, it seems. All is well.~n", [NodeB]),
unsplit:log_write(normal, "~p already stitched, it seems. All is well.~n", [NodeB]),
ok;
false ->
do_stitch_together(NodeA, NodeB)
Expand All @@ -152,14 +152,14 @@ do_stitch_together(NodeA, NodeB) ->
[IslandA, IslandB] =
[rpc:call(N, mnesia, system_info, [running_db_nodes]) ||
N <- [NodeA, NodeB]],
io:fwrite("IslandA = ~p;~nIslandB = ~p~n", [IslandA, IslandB]),
unsplit:log_write(normal, "IslandA = ~p;~nIslandB = ~p~n", [IslandA, IslandB]),
TabsAndNodes = affected_tables(IslandA, IslandB),
Tabs = [T || {T,_} <- TabsAndNodes],
io:fwrite("Affected tabs = ~p~n", [Tabs]),
unsplit:log_write(normal, "Affected tabs = ~p~n", [Tabs]),
DefaultMethod = default_method(),
TabMethods = [{T, Ns, get_method(T, DefaultMethod)}
|| {T,Ns} <- TabsAndNodes],
io:fwrite("Methods = ~p~n", [TabMethods]),
unsplit:log_write(normal, "Methods = ~p~n", [TabMethods]),
mnesia_controller:connect_nodes(
[NodeB],
fun(MergeF) ->
Expand All @@ -169,7 +169,7 @@ do_stitch_together(NodeA, NodeB) ->
%% For now, assume that we have merged with the right
%% node, and not with others that could also be
%% consistent (mnesia gurus, how does this work?)
io:fwrite("stitching: ~p~n", [TabMethods]),
unsplit:log_write(normal, "stitching: ~p~n", [TabMethods]),
stitch_tabs(TabMethods, NodeB),
Res;
Other ->
Expand All @@ -181,7 +181,7 @@ show_locks(OtherNode) ->
Info = [{node(), mnesia_locker:get_held_locks()},
{OtherNode, rpc:call(OtherNode,
mnesia_locker,get_held_locks,[])}],
io:fwrite("Held locks = ~p~n", [Info]).
unsplit:log_write(normal, "Held locks = ~p~n", [Info]).


stitch_tabs(TabMethods, NodeB) ->
Expand All @@ -193,16 +193,16 @@ stitch_tabs(TabMethods, NodeB) ->


do_stitch({Tab, Ns, {M, F, XArgs}} = TM, Remote) ->
io:fwrite("do_stitch(~p, ~p).~n", [TM,Remote]),
unsplit:log_write(normal, "do_stitch(~p, ~p).~n", [TM,Remote]),
HasCopy = lists:member(Remote, Ns),
io:fwrite("~p has a copy of ~p? -> ~p~n", [Remote, Tab, HasCopy]),
unsplit:log_write(normal, "~p has a copy of ~p? -> ~p~n", [Remote, Tab, HasCopy]),
Attrs = mnesia:table_info(Tab, attributes),
S0 = #st{module = M, function = F, extra_args = XArgs,
table = Tab, attributes = Attrs,
remote = Remote,
chunk = get_table_chunk_factor(Tab),
strategy = default_strategy()},
io:fwrite("Calling ~p:~p(init, ~p)", [M,F,[Tab,Attrs|XArgs]]),
unsplit:log_write(normal, "Calling ~p:~p(init, ~p)", [M,F,[Tab,Attrs|XArgs]]),
try
run_stitch(check_return(M:F(init, [Tab, Attrs | XArgs]), S0))
catch
Expand All @@ -213,7 +213,7 @@ do_stitch({Tab, Ns, {M, F, XArgs}} = TM, Remote) ->
-spec check_return(unsplit:merge_ret(), #st{}) -> #st{}.

check_return(Ret, S) ->
io:fwrite(" -> ~p~n", [Ret]),
unsplit:log_write(normal, " -> ~p~n", [Ret]),
case Ret of
stop -> throw(?DONE);
{ok, St} ->
Expand Down Expand Up @@ -308,7 +308,7 @@ affected_tables(IslandA, IslandB) ->
Nodes = lists:concat(
[mnesia:table_info(T, C) ||
C <- backend_types()]),
io:fwrite("nodes_of(~p) = ~p~n", [T, Nodes]),
unsplit:log_write(normal, "nodes_of(~p) = ~p~n", [T, Nodes]),
case {intersection(IslandA, Nodes),
intersection(IslandB, Nodes)} of
{[_|_], [_|_]} ->
Expand Down