Skip to content
4 changes: 3 additions & 1 deletion include/eredis_cluster.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
start_slot :: integer(),
end_slot :: integer(),
index :: integer(),
node :: #node{}
type :: atom(),
node :: #node{},
replicas :: [#node{}]
}).

-define(REDIS_CLUSTER_HASH_SLOTS, 16384).
Expand Down
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{erl_opts, [warnings_as_errors,
{erl_opts, [{platform_define, "^17\.", 'ERLANG_OTP_VERSION_17'},
warnings_as_errors,
warn_export_all]}.

{xref_checks, [undefined_function_calls]}.
Expand Down
174 changes: 166 additions & 8 deletions src/eredis_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
-export([stop/1]).

% API.
-export([start/0, stop/0, connect/1]). % Application Management.
-export([start/0, stop/0, connect/1, connect/2]). % Application Management.

% Generic redis call
-export([q/1, qp/1, qw/2, qk/2, qa/1, qmn/1, transaction/1, transaction/2]).
Expand Down Expand Up @@ -48,6 +48,18 @@ stop() ->
connect(InitServers) ->
eredis_cluster_monitor:connect(InitServers).

-spec connect(InitServers::term(), ReplicaReads::boolean()) -> Result::term().
connect(InitServers, ReplicaReads) ->
case ReplicaReads of
true ->
eredis_cluster_monitor:connect(InitServers, ReplicaReads);
false ->
connect(InitServers);
_ ->
throw({error, configuration_error})
end.


%% =============================================================================
%% @doc Wrapper function to execute a pipeline command as a transaction Command
%% (it will add MULTI and EXEC command)
Expand Down Expand Up @@ -124,8 +136,9 @@ split_by_pools(Commands) ->

split_by_pools([Command | T], Index, CmdAcc, MapAcc, State) ->
Key = get_key_from_command(Command),
CommandType = get_command_type(Command),
Slot = get_key_slot(Key),
{Pool, _Version} = eredis_cluster_monitor:get_pool_by_slot(Slot, State),
{Pool, _Version} = eredis_cluster_monitor:get_pool_by_slot(Slot, State, CommandType),
{NewAcc1, NewAcc2} =
case lists:keyfind(Pool, 1, CmdAcc) of
false ->
Expand Down Expand Up @@ -172,20 +185,25 @@ query(_, undefined) ->
{error, invalid_cluster_command};
query(Command, PoolKey) ->
Slot = get_key_slot(PoolKey),
CommandType = get_command_type(Command),
Transaction = fun(Worker) -> qw(Worker, Command) end,
query(Transaction, Slot, 0).
query(Transaction, Slot, 0, CommandType).

query(_, _, ?REDIS_CLUSTER_REQUEST_TTL) ->
{error, no_connection};

query(Transaction, Slot, Counter) ->
%% Throttle retries
throttle_retries(Counter),
%% In case of a transcation where type of the node can not be judged, it should be sent to master
query(Transaction, Slot, Counter, write).

{Pool, Version} = eredis_cluster_monitor:get_pool_by_slot(Slot),

query(Transaction, Slot, Counter, CommandType) ->
%% Throttle retries
throttle_retries(Counter),
{Pool, Version} = eredis_cluster_monitor:get_pool_by_slot(Slot, CommandType),
Result = eredis_cluster_pool:transaction(Pool, Transaction),
case handle_transaction_result(Result, Version) of
retry -> query(Transaction, Slot, Counter + 1);
retry -> query(Transaction, Slot, Counter + 1, CommandType);
Result -> Result
end.

Expand Down Expand Up @@ -338,7 +356,7 @@ eval(Script, ScriptHash, Keys, Args) ->
%% =============================================================================
-spec qa(redis_command()) -> ok | {error, Reason::bitstring()}.
qa(Command) ->
Pools = eredis_cluster_monitor:get_all_pools(),
Pools = eredis_cluster_monitor:get_all_master_pools(),
Transaction = fun(Worker) -> qw(Worker, Command) end,
[eredis_cluster_pool:transaction(Pool, Transaction) || Pool <- Pools].

Expand Down Expand Up @@ -442,6 +460,146 @@ get_key_from_command([Term1,Term2|Rest]) ->
get_key_from_command(_) ->
undefined.

%% =============================================================================
%% @doc Return the command type. It can be either read or write
%% The command list has been compiled with command from redis and filtering only
%% the readonly queries
%% @end
%% =============================================================================


get_command_type([Term1|Rest]) when is_bitstring(Term1) ->
get_command_type([bitstring_to_list(Term1)|Rest]);
get_command_type([Term1|_]) ->
case string:to_lower(Term1) of
"zrange" ->
read;
"touch" ->
read;
"zrevrangebylex" ->
read;
"dump" ->
read;
"zcount" ->
read;
"hmget" ->
read;
"hgetall" ->
read;
"sinter" ->
read;
"georadius_ro" ->
read;
"exists" ->
read;
"zrevrank" ->
read;
"zrangebyscore" ->
read;
"dbsize" ->
read;
"lrange" ->
read;
"type" ->
read;
"zlexcount" ->
read;
"zrank" ->
read;
"llen" ->
read;
"geodist" ->
read;
"bitcount" ->
read;
"zrangebylex" ->
read;
"sdiff" ->
read;
"hscan" ->
read;
"hexists" ->
read;
"randomkey" ->
read;
"zrevrangebyscore" ->
read;
"zrevrange" ->
read;
"pttl" ->
read;
"zcard" ->
read;
"ttl" ->
read;
"zscore" ->
read;
"smembers" ->
read;
"scan" ->
read;
"substr" ->
read;
"geohash" ->
read;
"georadiusbymember_ro" ->
read;
"geopos" ->
read;
"getrange" ->
read;
"strlen" ->
read;
"sscan" ->
read;
"hkeys" ->
read;
"sismember" ->
read;
"hstrlen" ->
read;
"mget" ->
read;
"get" ->
read;
"srandmember" ->
read;
"hvals" ->
read;
"psync" ->
read;
"sync" ->
read;
"object" ->
read;
"pfcount" ->
read;
"bitpos" ->
read;
"lindex" ->
read;
"keys" ->
read;
"getbit" ->
read;
"memory" ->
read;
"scard" ->
read;
"zscan" ->
read;
"hget" ->
read;
"sunion" ->
read;
"hlen" ->
read;
_ ->
write
end;
get_command_type(_) ->
write. %% Marking it write command for just in case

%% =============================================================================
%% @doc Get key for command where the key is in th 4th position (eval and
%% evalsha commands)
Expand Down
Loading