diff --git a/Changes b/Changes index 0f5c8b9..8a0a00b 100644 --- a/Changes +++ b/Changes @@ -1,6 +1,8 @@ Revision history for Redis {{$NEXT}} + * add supporte for new()'s "role" parameter: connect to slaves using + Sentinel for discovery 1.979 2015-05-14 14:28:35CEST+0200 Europe/Amsterdam diff --git a/lib/Redis.pm b/lib/Redis.pm index 31eb9cc..699327c 100644 --- a/lib/Redis.pm +++ b/lib/Redis.pm @@ -61,7 +61,7 @@ sub new { } defined $args{$_} - and $self->{$_} = $args{$_} for + and $self->{$_} = $args{$_} for qw(password on_connect name no_auto_connect_on_new cnx_timeout write_timeout read_timeout sentinels_cnx_timeout sentinels_write_timeout sentinels_read_timeout no_sentinels_list_update); @@ -69,6 +69,7 @@ sub new { $self->{reconnect} = $args{reconnect} || 0; $self->{conservative_reconnect} = $args{conservative_reconnect} || 0; $self->{every} = $args{every} || 1000; + $self->{role} = $args{role}; if (exists $args{sock}) { $self->{server} = $args{sock}; @@ -83,6 +84,7 @@ sub new { }; } elsif ($args{sentinels}) { $self->{sentinels} = $args{sentinels}; + $self->{role} ||= 'master'; ref $self->{sentinels} eq 'ARRAY' or croak("'sentinels' param must be an ArrayRef"); @@ -106,7 +108,16 @@ sub new { ? $self->{sentinels_write_timeout} : 1 ), ) } or next; - my $server_address = $sentinel->get_service_address($self->{service}); + my $server_address; + if ($self->{role} eq 'slave') { + my $slaves = $sentinel->get_slaves($self->{service}); + $status = "no slaves found for service '$self->{service}'", next unless @$slaves; + my $pick = $slaves->[int(rand(scalar(@$slaves)))]; + $server_address = "$pick->{ip}:$pick->{port}"; + } + else { + $server_address = $sentinel->get_service_address($self->{service}); + } defined $server_address or $status ||= "Sentinels don't know this service", next; @@ -128,12 +139,12 @@ sub new { ( sort { $h{$a} <=> $h{$b} } keys %h ), # sorted existing sentinels, grep { ! $h{$_}; } # list of unknown map { +{ @$_ }->{name}; } # names of - $sentinel->sentinel( # sentinels + $sentinel->sentinel( # sentinels sentinels => $self->{service} # for this service ) ]; } - + return $self->_maybe_enable_timeouts( IO::Socket::INET->new( PeerAddr => $server_address, @@ -431,7 +442,7 @@ sub wait_for_messages { $s->remove($s->handles); $s->add($sock); - while ($s->can_read($timeout)) { + while ($s->can_read($timeout)) { my $has_stuff = $self->__try_read_sock($sock); # If the socket is ready to read but there is nothing to read, ( so # it's an EOF ), try to reconnect. @@ -448,7 +459,7 @@ sub wait_for_messages { # or undef ( socket became EOF), back to select until timeout } while ($self->{__buf} || $self->__try_read_sock($sock)); } - + }); } catch { @@ -623,16 +634,26 @@ sub __build_sock { sub __close_sock { my ($self) = @_; + $self->{__buf} = ''; delete $self->{__inside_watch}; delete $self->{__inside_transaction}; + return close(delete $self->{sock}); } sub __on_connection { - my ($self) = @_; + if ($self->{role}) { + my $role = $self->__get_server_role(); + if ($role ne $self->{role}) { + ## FIXME: how to force the process to retry? If we are in + ## reconnect mode, it's easy, just abuse it... if not, then + ## maybe we should just reuse it? + } + } + # If we are in PubSub mode we shouldn't perform any command besides # (p)(un)subscribe if (! $self->{is_subscriber}) { @@ -642,7 +663,7 @@ sub __on_connection { $n = $n->($self) if ref($n) eq 'CODE'; $self->client_setname($n) if defined $n; }; - + defined $self->{current_database} and $self->select($self->{current_database}); } @@ -666,7 +687,20 @@ sub __on_connection { defined $self->{on_connect} and $self->{on_connect}->($self); +} + + +sub __get_server_role { + my ($self) = @_; + my $role; + eval { ($role) = $self->role(); 1 } or do { + my $info = $self->info('replication'); + $role = $info->{role}; + }; + die "Could not determine role" unless $role; + + return $role; } @@ -924,10 +958,14 @@ __END__ my $redis = Redis->new(write_timeout => 1.2); ## Connect via a list of Sentinels to a given service - my $redis = Redis->new(sentinels => [ '127.0.0.1:12345' ], service => 'mymaster'); + my $redis = Redis->new(sentinels => [ '127.0.0.1:12345' ], service => 'my_cluster'); + + ## Connect to a random slave of a Sentinel monitored service + ## it will reconnect to a random different slave on disconnect + my $redis = Redis->new(sentinels => [ '127.0.0.1:12345' ], service => 'my_cluster', role => 'slave'); ## Same, but with connection, read and write timeout on the sentinel hosts - my $redis = Redis->new( sentinels => [ '127.0.0.1:12345' ], service => 'mymaster', + my $redis = Redis->new( sentinels => [ '127.0.0.1:12345' ], service => 'my_cluster', sentinels_cnx_timeout => 0.1, sentinels_read_timeout => 1, sentinels_write_timeout => 1, @@ -1079,6 +1117,33 @@ So, if you are working with character strings, you should pre-encode or post-dec sentinels_write_timeout => 1, ); +Creates a L<< Redis >> instance and connects to a Redis server. + +The constructor will try to find the server to connect to using multiple methods, in the sequence below. The first found is used. + +=over + +=item * + +the C<< sock >> parameter; + +=item * + +the C<< sentinels >> parameter; + +=item * + +the C<< server >> parameter; + +=item * + +the C<< REDIS_SERVER >> environment variable. + +=back + +A detailed explanation of each of these parameters and environment +variable is found below. + =head3 C<< server >> The C<< server >> parameter specifies the Redis server we should connect to, @@ -1086,26 +1151,42 @@ via TCP. Use the 'IP:PORT' format. If no C<< server >> option is present, we will attempt to use the C<< REDIS_SERVER >> environment variable. If neither of those options are present, it defaults to '127.0.0.1:6379'. -Alternatively you can use the C<< sock >> parameter to specify the path of the -UNIX domain socket where the Redis server is listening. +=head3 C<< sock >> + +The C<< sock >> parameter specifies the path of the UNIX domain socket +where the Redis server is listening. + +=head3 C<< sentinels >> and C<< service >> + +The C<< sentinels >> and the C<< service >> parameters specify a list of +sentinels to contact and try to get the address of the servers +supporting the given service name. -Alternatively you can use the C<< sentinels >> parameter and the C<< service >> -parameter to specify a list of sentinels to contact and try to get the address -of the given service name. C<< sentinels >> must be an ArrayRef and C<< service ->> an Str. +The C<< sentinels >> parameter must be an ArrayRef +and C<< service >> an Str. -The C<< REDIS_SERVER >> can be used for UNIX domain sockets too. The following -formats are supported: +By default this will connect you to the master instance of the service, +but you can use the C<< role >> set as "slave" to randomly connect to +one of the slaves. If no slaves are found, the connect call will die. + +Please note that this means that you can also die on reconnects. =over -=item * +=item -/path/to/sock +Tip: you can actually use C<< role >> to make sure you are connected to +the correct type of server, even if you don't use Sentinel. -=item * +=back -unix:/path/to/sock +=head3 C<< REDIS_SERVER ENV >> + +The C<< REDIS_SERVER >> environment variable can be used to specify the +address or UNIX domain socket to use. The following formats are +supported: + +=over =item * @@ -1115,6 +1196,14 @@ unix:/path/to/sock tcp:127.0.0.1:11011 +=item * + +/path/to/sock + +=item * + +unix:/path/to/sock + =back =head3 C<< reconnect >>, C<< every >> diff --git a/lib/Redis/Sentinel.pm b/lib/Redis/Sentinel.pm index 85d2b10..9be5c6f 100644 --- a/lib/Redis/Sentinel.pm +++ b/lib/Redis/Sentinel.pm @@ -31,6 +31,17 @@ sub get_masters { map { +{ @$_ }; } @{ shift->sentinel('masters') || [] }; } +sub get_slaves { + my @slaves; + + eval {@slaves = map { +{@$_}; } @{ shift->sentinel('slaves', shift) || [] }; 1 } or do { + die $@ unless $@ =~ m/ERR No such master with that name/; + return; + }; + + return \@slaves; +} + 1; __END__ @@ -69,4 +80,12 @@ service were found. Returns a list of HashRefs representing all the master redis instances that this sentinel monitors. +=head2 get_slaves + +Takes the name of a service as parameter. + +If the service is not known to the sentinels server, returns undef. If +the service is known, retuns an arrayRef of hashRef's, one for each +slave available on the service. + =cut diff --git a/t/60-sentinel.t b/t/60-sentinel.t old mode 100644 new mode 100755 index de86a12..623e5ce --- a/t/60-sentinel.t +++ b/t/60-sentinel.t @@ -10,19 +10,33 @@ use Redis::Sentinel; use lib 't/tlib'; use Test::SpawnRedisServer; -my @ret = redis(); -my $redis_port = pop @ret; -my ($c, $redis_addr) = @ret; -END { diag 'shutting down redis'; $c->() if $c } +my @ret_m = redis(); +my $redis_m_port = pop @ret_m; +my ($c_m, $redis_m_addr) = @ret_m; +END { diag 'shutting down redis'; $c_m->() if $c_m } -diag "redis address : $redis_addr\n"; +diag "redis master address : $redis_m_addr\n"; -my @ret2 = sentinel( redis_port => $redis_port ); +my @ret_s = redis(); +my $redis_s_port = pop @ret_s; +my ($c_s, $redis_s_addr) = @ret_s; +END { diag 'shutting down redis'; $c_s->() if $c_s } + +eval { Redis->new(server => $redis_s_addr)->slaveof('127.0.0.1', $redis_m_port); 1 } or do { + plan skip_all => '** FAILED to set slave server as a SLAVEOF master, aborting tests **'; +}; + +diag "redis slave address : $redis_s_addr\n"; + +diag('Waiting 1 second to make sure the master/slave setup is in place before starting Sentinels'); +sleep 1; + +my @ret2 = sentinel( redis_port => $redis_m_port ); my $sentinel_port = pop @ret2; my ($c2, $sentinel_addr) = @ret2; END { diag 'shutting down sentinel'; $c2->() if $c2 } -my @ret3 = sentinel( redis_port => $redis_port ); +my @ret3 = sentinel( redis_port => $redis_m_port ); my $sentinel2_port = pop @ret3; my ($c3, $sentinel2_addr) = @ret3; END { diag 'shutting down sentinel2'; $c3->() if $c3 } @@ -30,7 +44,7 @@ END { diag 'shutting down sentinel2'; $c3->() if $c3 } diag "sentinel address: $sentinel_addr\n"; diag "sentinel2 address: $sentinel2_addr\n"; -diag("wait 3 sec for the sentinels and the master to gossip"); +diag("wait 3 secs for the sentinels and the master to gossip"); sleep 3; { @@ -40,22 +54,37 @@ sleep 3; cmp_deeply($got, superhashof({ name => 'mymaster', ip => '127.0.0.1', - port => $redis_port, + port => $redis_m_port, flags => 'master', 'role-reported' => 'master', 'config-epoch' => 0, - 'num-slaves' => 0, + 'num-slaves' => 1, 'num-other-sentinels' => 1, quorum => 2, }), "sentinel has proper config of its master" ); + + $got = $sentinel->get_slaves('mymaster'); + cmp_deeply( + $got, + [ superhashof( + { 'port' => $redis_s_port, + 'flags' => "slave", + 'master-port' => $redis_m_port, + 'role-reported' => "slave", + 'name' => "127.0.0.1:$redis_s_port", + } + ) + ], + "sentinel has proper config of its slaves" + ); } { my $sentinel = Redis::Sentinel->new(server => $sentinel_addr); my $address = $sentinel->get_service_address('mymaster'); - is $address, "127.0.0.1:$redis_port", "found service mymaster"; + is $address, "127.0.0.1:$redis_m_port", "found service mymaster"; } { @@ -73,4 +102,10 @@ sleep 3; } +{ + # connect to the slave via the sentinel + my $redis = Redis->new(sentinels => [ $sentinel_addr ], service => 'mymaster', role => 'slave'); + is($redis->__get_server_role(), 'slave', 'Redis client connect to slave server via Sentinel'); +} + done_testing(); diff --git a/t/tlib/Test/SpawnRedisServer.pm b/t/tlib/Test/SpawnRedisServer.pm index d26723c..302ce5b 100644 --- a/t/tlib/Test/SpawnRedisServer.pm +++ b/t/tlib/Test/SpawnRedisServer.pm @@ -107,7 +107,7 @@ sub sentinel { sentinel down-after-milliseconds mymaster 2000 sentinel failover-timeout mymaster 4000 - logfile sentinel-$addr.log + logfile redis-sentinel-$addr.log "); $fh->flush;