|
| 1 | +%%% % @format |
| 2 | + |
| 3 | +-module(merge_raft_reg). |
| 4 | +-compile(warn_missing_spec_all). |
| 5 | +-moduledoc """ |
| 6 | +a simple gen_server implementing pid register using merge raft |
| 7 | +""". |
| 8 | + |
| 9 | +-behaviour(merge_raft). |
| 10 | + |
| 11 | +%% OTP supervision |
| 12 | +-export([ |
| 13 | + child_spec/0, |
| 14 | + start_link/0, |
| 15 | + start/0 |
| 16 | +]). |
| 17 | + |
| 18 | +%% API functions |
| 19 | +-export([ |
| 20 | + reg/2, |
| 21 | + get/1, |
| 22 | + resolve/3 |
| 23 | +]). |
| 24 | + |
| 25 | +%% merge_raft callbacks |
| 26 | +-export([ |
| 27 | + db_init/2, |
| 28 | + apply_custom/3, |
| 29 | + apply_merge/4, |
| 30 | + apply_leave/3, |
| 31 | + apply_replace/2, |
| 32 | + serialize/1, |
| 33 | + reset/2 |
| 34 | +]). |
| 35 | + |
| 36 | +%% gen_server callbacks |
| 37 | +-export([ |
| 38 | + init/1, |
| 39 | + handle_call/3, |
| 40 | + handle_cast/2, |
| 41 | + handle_info/2 |
| 42 | +]). |
| 43 | + |
| 44 | +-type server_name() :: merge_raft:server_name(). |
| 45 | +-type peer() :: merge_raft:peer(). |
| 46 | +-type members() :: merge_raft:members(). |
| 47 | +-type commit_metadata() :: merge_raft:commit_metadata(). |
| 48 | + |
| 49 | +-type name() :: atom(). |
| 50 | + |
| 51 | +-type custom_db() :: ets:table(). |
| 52 | +-type custom_result() :: ok | boolean(). |
| 53 | +-type custom_log() :: {reg, name(), pid()} | {unreg, name(), pid()}. |
| 54 | +-type custom_db_serialized() :: #{}. |
| 55 | + |
| 56 | +-type state() :: #{name() => pid()}. |
| 57 | + |
| 58 | +-define(RAFT_SERVER, merge_raft_reg_server). |
| 59 | + |
| 60 | +%============================================================================== |
| 61 | +% OTP supervision |
| 62 | +%============================================================================== |
| 63 | + |
| 64 | +-spec child_spec() -> supervisor:child_spec(). |
| 65 | +child_spec() -> |
| 66 | + #{ |
| 67 | + id => ?MODULE, |
| 68 | + start => {?MODULE, start_link, []}, |
| 69 | + restart => transient, |
| 70 | + shutdown => 1000, |
| 71 | + modules => [?MODULE] |
| 72 | + }. |
| 73 | + |
| 74 | +-spec start_link() -> gen_server:start_ret(). |
| 75 | +start_link() -> |
| 76 | + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| 77 | + |
| 78 | +-spec start() -> gen_server:start_ret(). |
| 79 | +start() -> |
| 80 | + gen_server:start({local, ?MODULE}, ?MODULE, [], []). |
| 81 | + |
| 82 | +%============================================================================== |
| 83 | +% API functions |
| 84 | +%============================================================================== |
| 85 | + |
| 86 | +-spec reg(name(), pid()) -> boolean(). |
| 87 | +reg(Name, Pid) -> |
| 88 | + gen_server:call(?MODULE, {reg, Name, Pid}). |
| 89 | + |
| 90 | +-spec get(name()) -> pid() | undefined. |
| 91 | +get(Name) -> |
| 92 | + case ets:lookup(?MODULE, Name) of |
| 93 | + [{_Name, Pid}] -> |
| 94 | + Pid; |
| 95 | + _ -> |
| 96 | + undefined |
| 97 | + end. |
| 98 | + |
| 99 | +%============================================================================== |
| 100 | +% merge_raft callbacks |
| 101 | +%============================================================================== |
| 102 | + |
| 103 | +-spec db_init(peer(), server_name()) -> {undefined | commit_metadata(), custom_db()}. |
| 104 | +db_init(_Me, _Name) -> |
| 105 | + {undefined, ets:new(?MODULE, [set, protected, named_table])}. |
| 106 | + |
| 107 | +-spec apply_custom(commit_metadata(), custom_log(), custom_db()) -> {custom_result(), custom_db()}. |
| 108 | +apply_custom(_CommitMetadata, {reg, Name, Pid}, Db) -> |
| 109 | + case ets:lookup(Db, Name) of |
| 110 | + [{_Name, Pid1}] when Pid1 =:= Pid -> |
| 111 | + ok; |
| 112 | + [{_Name, Pid1}] -> |
| 113 | + ets:insert(Db, {Name, resolve(Name, Pid1, Pid)}); |
| 114 | + _ -> |
| 115 | + ets:insert(Db, {Name, Pid}) |
| 116 | + end, |
| 117 | + {ets:lookup_element(Db, Name, 2) =:= Pid, Db}; |
| 118 | +apply_custom(_CommitMetadata, {unreg, Name, Pid}, Db) -> |
| 119 | + case ets:lookup(Db, Name) of |
| 120 | + [{_Name, Pid}] -> |
| 121 | + ets:delete(Db, Name); |
| 122 | + _ -> |
| 123 | + ok |
| 124 | + end, |
| 125 | + {ok, Db}. |
| 126 | + |
| 127 | +-spec apply_merge(commit_metadata(), members(), custom_db_serialized(), custom_db()) -> custom_db(). |
| 128 | +apply_merge(_CommitMetadata, _Members, Data, Db) -> |
| 129 | + [ |
| 130 | + case ets:lookup(Db, Name) of |
| 131 | + [{_Name, Pid1}] when Pid1 =:= Pid -> |
| 132 | + ok; |
| 133 | + [{_Name, Pid1}] -> |
| 134 | + ets:insert(Db, {Name, resolve(Name, Pid1, Pid)}); |
| 135 | + _ -> |
| 136 | + ets:insert(Db, {Name, Pid}) |
| 137 | + end |
| 138 | + || Name := Pid <- Data |
| 139 | + ], |
| 140 | + Db. |
| 141 | + |
| 142 | +-spec apply_leave(commit_metadata(), peer(), custom_db()) -> custom_db(). |
| 143 | +apply_leave(_CommitMetadata, _Peer, Db) -> |
| 144 | + % remove names related to the peer |
| 145 | + % however we should not remove the names new generation of peer is in members |
| 146 | + Db. |
| 147 | + |
| 148 | +-spec apply_replace(custom_db_serialized(), custom_db()) -> custom_db(). |
| 149 | +apply_replace(Data, Db) -> |
| 150 | + % can do incremental update to have less churn |
| 151 | + ets:delete_all_objects(Db), |
| 152 | + ets:insert(Db, maps:to_list(Data)), |
| 153 | + Db. |
| 154 | + |
| 155 | +-spec serialize(custom_db()) -> custom_db_serialized(). |
| 156 | +serialize(Db) -> |
| 157 | + maps:from_list(ets:tab2list(Db)). |
| 158 | + |
| 159 | +-spec reset(peer(), custom_db()) -> custom_db(). |
| 160 | +reset(_Me, Db) -> |
| 161 | + [ |
| 162 | + ets:delete(Db, Name) |
| 163 | + || {Name, Pid} <- ets:tab2list(Db), |
| 164 | + node(Pid) =:= node(), |
| 165 | + is_process_alive(Pid) |
| 166 | + ], |
| 167 | + Db. |
| 168 | + |
| 169 | +%============================================================================== |
| 170 | +% gen_server callbacks |
| 171 | +%============================================================================== |
| 172 | + |
| 173 | +-spec init([]) -> {ok, state()}. |
| 174 | +init([]) -> |
| 175 | + merge_raft:start_link(#{name => ?RAFT_SERVER, module => ?MODULE}), |
| 176 | + {ok, #{}}. |
| 177 | + |
| 178 | +-spec handle_call(dynamic(), gen_server:from(), state()) -> {reply, boolean(), state()}. |
| 179 | +handle_call({reg, Name, Pid}, _From, State) -> |
| 180 | + case merge_raft:commit_sync(?RAFT_SERVER, {reg, Name, Pid}) of |
| 181 | + {ok, true} -> |
| 182 | + monitor(process, Pid, [{tag, Name}]), |
| 183 | + {reply, true, State}; |
| 184 | + _ -> |
| 185 | + {reply, false, State} |
| 186 | + end. |
| 187 | + |
| 188 | +-spec handle_cast(dynamic(), state()) -> no_return(). |
| 189 | +handle_cast(_Request, _State) -> |
| 190 | + error(badarg). |
| 191 | + |
| 192 | +-spec handle_info({name(), reference(), process, pid(), term()}, state()) -> {noreply, state()}. |
| 193 | +handle_info({Name, _Mon, process, Pid, _Reason}, State) -> |
| 194 | + % should keep retry when failed |
| 195 | + % code example only, don't hanlde failures |
| 196 | + {ok, ok} = merge_raft:commit_sync(?RAFT_SERVER, {unreg, Name, Pid}), |
| 197 | + {noreply, State}. |
| 198 | + |
| 199 | +%============================================================================== |
| 200 | +% internal functions |
| 201 | +%============================================================================== |
| 202 | + |
| 203 | +-spec resolve(name(), pid(), pid()) -> pid(). |
| 204 | +resolve(_Name, Pid1, Pid2) when Pid1 < Pid2 -> |
| 205 | + exit(Pid2, conflict), |
| 206 | + Pid1; |
| 207 | +resolve(_Name, Pid1, Pid2) -> |
| 208 | + exit(Pid1, conflict), |
| 209 | + Pid2. |
0 commit comments