diff --git a/include/eredis_cluster.hrl b/include/eredis_cluster.hrl index 6efdfa6..16ab709 100644 --- a/include/eredis_cluster.hrl +++ b/include/eredis_cluster.hrl @@ -24,7 +24,9 @@ start_slot :: integer(), end_slot :: integer(), index :: integer(), - node :: #node{} + type :: atom(), + node :: #node{}, + replicas :: [#node{}] }). -define(REDIS_CLUSTER_HASH_SLOTS, 16384). diff --git a/rebar.config b/rebar.config index 08c34c1..63f3c18 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,5 @@ -{erl_opts, [warnings_as_errors, +{erl_opts, [{platform_define, "^17\.", 'ERLANG_OTP_VERSION_17'}, + warnings_as_errors, warn_export_all]}. {xref_checks, [undefined_function_calls]}. diff --git a/src/eredis_cluster.erl b/src/eredis_cluster.erl index 1da3a9f..01d6f4f 100644 --- a/src/eredis_cluster.erl +++ b/src/eredis_cluster.erl @@ -6,7 +6,7 @@ -export([stop/1]). % API. --export([start/0, stop/0, connect/1]). % Application Management. +-export([start/0, stop/0, connect/1, connect/2]). % Application Management. % Generic redis call -export([q/1, qp/1, qw/2, qk/2, qa/1, qmn/1, transaction/1, transaction/2]). @@ -48,6 +48,18 @@ stop() -> connect(InitServers) -> eredis_cluster_monitor:connect(InitServers). +-spec connect(InitServers::term(), ReplicaReads::boolean()) -> Result::term(). +connect(InitServers, ReplicaReads) -> + case ReplicaReads of + true -> + eredis_cluster_monitor:connect(InitServers, ReplicaReads); + false -> + connect(InitServers); + _ -> + throw({error, configuration_error}) + end. + + %% ============================================================================= %% @doc Wrapper function to execute a pipeline command as a transaction Command %% (it will add MULTI and EXEC command) @@ -124,8 +136,9 @@ split_by_pools(Commands) -> split_by_pools([Command | T], Index, CmdAcc, MapAcc, State) -> Key = get_key_from_command(Command), + CommandType = get_command_type(Command), Slot = get_key_slot(Key), - {Pool, _Version} = eredis_cluster_monitor:get_pool_by_slot(Slot, State), + {Pool, _Version} = eredis_cluster_monitor:get_pool_by_slot(Slot, State, CommandType), {NewAcc1, NewAcc2} = case lists:keyfind(Pool, 1, CmdAcc) of false -> @@ -172,20 +185,25 @@ query(_, undefined) -> {error, invalid_cluster_command}; query(Command, PoolKey) -> Slot = get_key_slot(PoolKey), + CommandType = get_command_type(Command), Transaction = fun(Worker) -> qw(Worker, Command) end, - query(Transaction, Slot, 0). + query(Transaction, Slot, 0, CommandType). query(_, _, ?REDIS_CLUSTER_REQUEST_TTL) -> {error, no_connection}; + query(Transaction, Slot, Counter) -> - %% Throttle retries - throttle_retries(Counter), + %% In case of a transcation where type of the node can not be judged, it should be sent to master + query(Transaction, Slot, Counter, write). - {Pool, Version} = eredis_cluster_monitor:get_pool_by_slot(Slot), +query(Transaction, Slot, Counter, CommandType) -> + %% Throttle retries + throttle_retries(Counter), + {Pool, Version} = eredis_cluster_monitor:get_pool_by_slot(Slot, CommandType), Result = eredis_cluster_pool:transaction(Pool, Transaction), case handle_transaction_result(Result, Version) of - retry -> query(Transaction, Slot, Counter + 1); + retry -> query(Transaction, Slot, Counter + 1, CommandType); Result -> Result end. @@ -338,7 +356,7 @@ eval(Script, ScriptHash, Keys, Args) -> %% ============================================================================= -spec qa(redis_command()) -> ok | {error, Reason::bitstring()}. qa(Command) -> - Pools = eredis_cluster_monitor:get_all_pools(), + Pools = eredis_cluster_monitor:get_all_master_pools(), Transaction = fun(Worker) -> qw(Worker, Command) end, [eredis_cluster_pool:transaction(Pool, Transaction) || Pool <- Pools]. @@ -442,6 +460,146 @@ get_key_from_command([Term1,Term2|Rest]) -> get_key_from_command(_) -> undefined. +%% ============================================================================= +%% @doc Return the command type. It can be either read or write +%% The command list has been compiled with command from redis and filtering only +%% the readonly queries +%% @end +%% ============================================================================= + + +get_command_type([Term1|Rest]) when is_bitstring(Term1) -> + get_command_type([bitstring_to_list(Term1)|Rest]); +get_command_type([Term1|_]) -> + case string:to_lower(Term1) of + "zrange" -> + read; + "touch" -> + read; + "zrevrangebylex" -> + read; + "dump" -> + read; + "zcount" -> + read; + "hmget" -> + read; + "hgetall" -> + read; + "sinter" -> + read; + "georadius_ro" -> + read; + "exists" -> + read; + "zrevrank" -> + read; + "zrangebyscore" -> + read; + "dbsize" -> + read; + "lrange" -> + read; + "type" -> + read; + "zlexcount" -> + read; + "zrank" -> + read; + "llen" -> + read; + "geodist" -> + read; + "bitcount" -> + read; + "zrangebylex" -> + read; + "sdiff" -> + read; + "hscan" -> + read; + "hexists" -> + read; + "randomkey" -> + read; + "zrevrangebyscore" -> + read; + "zrevrange" -> + read; + "pttl" -> + read; + "zcard" -> + read; + "ttl" -> + read; + "zscore" -> + read; + "smembers" -> + read; + "scan" -> + read; + "substr" -> + read; + "geohash" -> + read; + "georadiusbymember_ro" -> + read; + "geopos" -> + read; + "getrange" -> + read; + "strlen" -> + read; + "sscan" -> + read; + "hkeys" -> + read; + "sismember" -> + read; + "hstrlen" -> + read; + "mget" -> + read; + "get" -> + read; + "srandmember" -> + read; + "hvals" -> + read; + "psync" -> + read; + "sync" -> + read; + "object" -> + read; + "pfcount" -> + read; + "bitpos" -> + read; + "lindex" -> + read; + "keys" -> + read; + "getbit" -> + read; + "memory" -> + read; + "scard" -> + read; + "zscan" -> + read; + "hget" -> + read; + "sunion" -> + read; + "hlen" -> + read; + _ -> + write + end; +get_command_type(_) -> + write. %% Marking it write command for just in case + %% ============================================================================= %% @doc Get key for command where the key is in th 4th position (eval and %% evalsha commands) diff --git a/src/eredis_cluster_monitor.erl b/src/eredis_cluster_monitor.erl index 5a74f45..c8256c4 100644 --- a/src/eredis_cluster_monitor.erl +++ b/src/eredis_cluster_monitor.erl @@ -4,11 +4,15 @@ %% API. -export([start_link/0]). -export([connect/1]). +-export([connect/2]). -export([refresh_mapping/1]). -export([get_state/0, get_state_version/1]). --export([get_pool_by_slot/1, get_pool_by_slot/2]). +-export([get_pool_by_slot/1, get_pool_by_slot/2, get_pool_by_slot/3]). +-export([get_write_pool_by_slot/2]). +-export([get_all_master_pools/0]). -export([get_all_pools/0]). + %% gen_server. -export([init/1]). -export([handle_call/3]). @@ -17,13 +21,17 @@ -export([terminate/2]). -export([code_change/3]). + + %% Type definition. -include("eredis_cluster.hrl"). + -record(state, { init_nodes :: [#node{}], slots :: tuple(), %% whose elements are integer indexes into slots_maps - slots_maps :: tuple(), %% whose elements are #slots_map{} - version :: integer() + slots_maps :: list(), %% whose elements are #slots_map{} + version :: integer(), + replica_read_flag :: boolean() }). %% API. @@ -32,7 +40,10 @@ start_link() -> gen_server:start_link({local,?MODULE}, ?MODULE, [], []). connect(InitServers) -> - gen_server:call(?MODULE,{connect,InitServers}). + connect(InitServers, false). + +connect(InitServers, ReplicaReadsFlag) -> + gen_server:call(?MODULE,{connect,InitServers, ReplicaReadsFlag}). refresh_mapping(Version) -> gen_server:call(?MODULE,{reload_slots_map,Version}). @@ -53,6 +64,13 @@ get_state_version(State) -> -spec get_all_pools() -> [pid()]. get_all_pools() -> + State = get_state(), + SlotsMapList = tuple_to_list(State#state.slots_maps), + lists:flatmap(fun(SlotMap) -> get_pools_in_slots_map(SlotMap) end, SlotsMapList). + + +-spec get_all_master_pools() -> [pid()]. +get_all_master_pools() -> State = get_state(), SlotsMapList = tuple_to_list(State#state.slots_maps), [SlotsMap#slots_map.node#node.pool || SlotsMap <- SlotsMapList, @@ -63,23 +81,46 @@ get_all_pools() -> %% to prevent from querying ets inside loops. %% @end %% ============================================================================= --spec get_pool_by_slot(Slot::integer(), State::#state{}) -> +-spec get_pool_by_slot(Slot::integer(), State::#state{}, CommandType::atom()) -> {PoolName::atom() | undefined, Version::integer()}. -get_pool_by_slot(Slot, State) -> +get_pool_by_slot(Slot, State, CommandType) -> Index = element(Slot+1,State#state.slots), - Cluster = element(Index,State#state.slots_maps), + MasterSlotMap = element(Index, State#state.slots_maps), + ClusterNode = case {CommandType, State#state.replica_read_flag} of + {read, true} -> + ReadClusters = MasterSlotMap#slots_map.replicas, + case ReadClusters of + [] -> + MasterSlotMap#slots_map.node; + _ -> + lists:nth(eredis_cluster_util:get_random_number(length(ReadClusters)), ReadClusters) %% Gets a random element from the read replicas + end; + _ -> + MasterSlotMap#slots_map.node + end, if - Cluster#slots_map.node =/= undefined -> - {Cluster#slots_map.node#node.pool, State#state.version}; + ClusterNode =/= undefined -> + {ClusterNode#node.pool, State#state.version}; true -> {undefined, State#state.version} end. +-spec get_pool_by_slot(Slot::integer(), CommandType::atom()) -> + {PoolName::atom() | undefined, Version::integer()}. +get_pool_by_slot(Slot, CommandType) when is_atom(CommandType)-> + State = get_state(), + get_pool_by_slot(Slot, State, CommandType). + -spec get_pool_by_slot(Slot::integer()) -> {PoolName::atom() | undefined, Version::integer()}. get_pool_by_slot(Slot) -> - State = get_state(), - get_pool_by_slot(Slot, State). + get_pool_by_slot(Slot, write). + +-spec get_write_pool_by_slot(Slot::integer(), State::#state{}) -> + {PoolName::atom() | undefined, Version::integer()}. +get_write_pool_by_slot(Slot, State) -> + get_pool_by_slot(Slot, State, write). + -spec reload_slots_map(State::#state{}) -> NewState::#state{}. reload_slots_map(State) -> @@ -88,10 +129,10 @@ reload_slots_map(State) -> ClusterSlots = get_cluster_slots(State#state.init_nodes), - SlotsMaps = parse_cluster_slots(ClusterSlots), + SlotsMaps = parse_cluster_slots(ClusterSlots, State#state.replica_read_flag), ConnectedSlotsMaps = connect_all_slots(SlotsMaps), - Slots = create_slots_cache(ConnectedSlotsMaps), + Slots = create_slots_cache(ConnectedSlotsMaps), NewState = State#state{ slots = list_to_tuple(Slots), slots_maps = list_to_tuple(ConnectedSlotsMaps), @@ -99,7 +140,7 @@ reload_slots_map(State) -> }, true = ets:insert(?MODULE, [{cluster_state, NewState}]), - + NewState. -spec get_cluster_slots([#node{}]) -> [[bitstring() | [bitstring()]]]. @@ -130,33 +171,38 @@ get_cluster_slots_from_single_node(Node) -> [[<<"0">>, integer_to_binary(?REDIS_CLUSTER_HASH_SLOTS-1), [list_to_binary(Node#node.address), integer_to_binary(Node#node.port)]]]. --spec parse_cluster_slots([[bitstring() | [bitstring()]]]) -> [#slots_map{}]. -parse_cluster_slots(ClusterInfo) -> - parse_cluster_slots(ClusterInfo, 1, []). +-spec parse_cluster_slots([[bitstring() | [bitstring()]]], boolean()) -> [#slots_map{}]. +parse_cluster_slots(ClusterInfo, ReplicaReadsFlag) -> + parse_cluster_slots(ClusterInfo, 1, [], ReplicaReadsFlag). -parse_cluster_slots([[StartSlot, EndSlot | [[Address, Port | _] | _]] | T], Index, Acc) -> +parse_cluster_slots([[StartSlot, EndSlot | [[Address, Port | _] | ReplicaSlots]] | T], Index, Acc, ReplicaReadsFlag) -> SlotsMap = #slots_map{ index = Index, start_slot = binary_to_integer(StartSlot), end_slot = binary_to_integer(EndSlot), + type = master, node = #node{ address = binary_to_list(Address), port = binary_to_integer(Port) - } + }, + replicas = case ReplicaReadsFlag of + true -> + [#node{address = binary_to_list(AddressR), port = binary_to_integer(PortR)} || [AddressR, PortR | _ ] <- ReplicaSlots]; + _ -> + [] + end }, - parse_cluster_slots(T, Index+1, [SlotsMap | Acc]); -parse_cluster_slots([], _Index, Acc) -> + parse_cluster_slots(T, Index+1, [SlotsMap | Acc], ReplicaReadsFlag); +parse_cluster_slots([], _Index, Acc, _) -> lists:reverse(Acc). - - -spec close_connection(#slots_map{}) -> ok. close_connection(SlotsMap) -> Node = SlotsMap#slots_map.node, if Node =/= undefined -> - try eredis_cluster_pool:stop(Node#node.pool) of + try {eredis_cluster_pool:stop(Node#node.pool), lists:foreach(fun(ReplicaNode) -> eredis_cluster_pool:stop(ReplicaNode#node.pool) end, SlotsMap#slots_map.replicas)} of _ -> ok catch @@ -189,42 +235,64 @@ create_slots_cache(SlotsMaps) -> SlotsCache = [[{Index,SlotsMap#slots_map.index} || Index <- lists:seq(SlotsMap#slots_map.start_slot, SlotsMap#slots_map.end_slot)] - || SlotsMap <- SlotsMaps], + || SlotsMap <- SlotsMaps, SlotsMap#slots_map.type == master], %% Only make cache slot from master nodes SlotsCacheF = lists:flatten(SlotsCache), SortedSlotsCache = lists:sort(SlotsCacheF), [ Index || {_,Index} <- SortedSlotsCache]. -spec connect_all_slots([#slots_map{}]) -> [integer()]. connect_all_slots(SlotsMapList) -> - [SlotsMap#slots_map{node=connect_node(SlotsMap#slots_map.node)} - || SlotsMap <- SlotsMapList]. - --spec connect_([{Address::string(), Port::integer()}]) -> #state{}. -connect_([]) -> - #state{}; -connect_(InitNodes) -> + ConnectedSlotsMaps = [SlotsMap#slots_map{node=connect_node(SlotsMap#slots_map.node), replicas=[connect_node(Replica) || Replica <- SlotsMap#slots_map.replicas]} + || SlotsMap <- SlotsMapList], + lists:foreach( + fun(ConnectedSlot) -> + Replicas = ConnectedSlot#slots_map.replicas, + Command = ["READONLY"], + [eredis_cluster_pool:transaction_all(Replica#node.pool, Command) || Replica <- Replicas] + end, + ConnectedSlotsMaps + ), + ConnectedSlotsMaps. + +-spec connect_([{Address::string(), Port::integer()}], ReplicaReadsFlag::boolean()) -> #state{}. +connect_([], ReplicaReadsFlag) -> + #state{ + replica_read_flag = ReplicaReadsFlag + }; +connect_(InitNodes, ReplicaReadsFlag) -> State = #state{ slots = undefined, slots_maps = {}, init_nodes = [#node{address = A, port = P} || {A,P} <- InitNodes], - version = 0 + version = 0, + replica_read_flag = ReplicaReadsFlag }, - reload_slots_map(State). +get_pools_in_slots_map(SlotsMap) when SlotsMap#slots_map.node =/= undefined -> + [SlotsMap#slots_map.node#node.pool] ++ [Replica#node.pool || Replica <- SlotsMap#slots_map.replicas]; + +get_pools_in_slots_map(_) -> +[]. %% gen_server. init(_Args) -> ets:new(?MODULE, [protected, set, named_table, {read_concurrency, true}]), InitNodes = application:get_env(eredis_cluster, init_nodes, []), - {ok, connect_(InitNodes)}. + ReplicaReadsFlag = case application:get_env(eredis_cluster, replica_read_flag, undefined) of + [] -> false; + [true] -> true; + true -> true; + _ -> false + end, + {ok, connect_(InitNodes, ReplicaReadsFlag)}. handle_call({reload_slots_map,Version}, _From, #state{version=Version} = State) -> {reply, ok, reload_slots_map(State)}; handle_call({reload_slots_map,_}, _From, State) -> {reply, ok, State}; -handle_call({connect, InitServers}, _From, _State) -> - {reply, ok, connect_(InitServers)}; +handle_call({connect, InitServers, ReplicaReadsFlag}, _From, _State) -> + {reply, ok, connect_(InitServers, ReplicaReadsFlag)}; handle_call(_Request, _From, State) -> {reply, ignored, State}. diff --git a/src/eredis_cluster_pool.erl b/src/eredis_cluster_pool.erl index 877b992..0c0c065 100644 --- a/src/eredis_cluster_pool.erl +++ b/src/eredis_cluster_pool.erl @@ -1,10 +1,12 @@ -module(eredis_cluster_pool). -behaviour(supervisor). + %% API. -export([create/2]). -export([stop/1]). -export([transaction/2]). +-export([transaction_all/2]). %% Supervisor -export([start_link/0]). @@ -53,6 +55,24 @@ transaction(PoolName, Transaction) -> {error, no_connection} end. +%% this executes a transaction in all the pools +%% used mainly for broadcasting readonly query to all the slaves +-spec transaction_all(PoolName::atom(), Command :: list()) -> + ok. +transaction_all(PoolName, Command) -> + try + Workers = gen_server:call(PoolName, get_avail_workers), + lists:foreach( fun(Worker) -> + eredis_cluster_pool_worker:query(Worker, Command) + end, + Workers + ), + ok + catch + exit:_ -> + throw({error,cannot_issue_readonly_to_slaves}) + end. + -spec stop(PoolName::atom()) -> ok. stop(PoolName) -> supervisor:terminate_child(?MODULE,PoolName), @@ -61,7 +81,7 @@ stop(PoolName) -> -spec get_name(Host::string(), Port::integer()) -> PoolName::atom(). get_name(Host, Port) -> - list_to_atom(Host ++ "#" ++ integer_to_list(Port)). + list_to_atom(Host ++ "#" ++ integer_to_list(Port)). %% Generate a random name for the pool -spec start_link() -> {ok, pid()}. start_link() -> diff --git a/src/eredis_cluster_util.erl b/src/eredis_cluster_util.erl new file mode 100644 index 0000000..9b01e27 --- /dev/null +++ b/src/eredis_cluster_util.erl @@ -0,0 +1,9 @@ +-module(eredis_cluster_util). + +-export([get_random_number/1]). + +-ifdef(ERLANG_OTP_VERSION_17). +get_random_number(N) -> random:seed(now()), random:uniform(N). +-else. +get_random_number(N) -> rand:uniform(N). +-endif. \ No newline at end of file diff --git a/test.config b/test.config index 391adaf..20adf70 100644 --- a/test.config +++ b/test.config @@ -7,6 +7,7 @@ {pool_size, 5}, {pool_max_overflow, 0}, {database, 0}, - {password, ""} + {password, ""}, + {replica_read_flag, true} ] }].