From 9e60ee3849d2d19a032e1efad38450ffd575fd9e Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Sun, 28 Jan 2024 21:04:51 -0600 Subject: [PATCH] Add ReplicationClient --- spec/replication_client_spec.cr | 54 ++++ src/client.cr | 6 +- src/cluster.cr | 57 +---- src/connection.cr | 8 +- src/log.cr | 6 + src/read_only_commands.cr | 182 ++++++++++++++ src/replication_client.cr | 421 ++++++++++++++++++++++++++++++++ 7 files changed, 671 insertions(+), 63 deletions(-) create mode 100644 spec/replication_client_spec.cr create mode 100644 src/log.cr create mode 100644 src/read_only_commands.cr create mode 100644 src/replication_client.cr diff --git a/spec/replication_client_spec.cr b/spec/replication_client_spec.cr new file mode 100644 index 0000000..440d1ba --- /dev/null +++ b/spec/replication_client_spec.cr @@ -0,0 +1,54 @@ +require "./spec_helper" +require "../src/replication_client" + +module Redis + describe ReplicationClient do + describe ".parse_replication_section" do + it "parses the master's replication section" do + section = <<-SECTION + # Replication\r + role:master\r + connected_slaves:2\r + slave0:ip=10.76.3.39,port=6379,state=stable_sync,lag=0\r + slave1:ip=10.76.1.130,port=6379,state=stable_sync,lag=0\r + master_replid:b08ca5082296cf5b2c1de7207f2bc16bb8da3d80\r + + SECTION + + data = ReplicationClient::Info::Replication.new(section) + + data.role.master?.should eq true + data.connected_replicas.should eq 2 + data.replicas.should contain ReplicationClient::Info::Replica.new( + ip: "10.76.3.39", + port: 6379, + state: :stable_sync, + lag: 0.seconds, + ) + end + + it "parses a replica's replication section" do + section = <<-SECTION + # Replication\r + role:replica\r + master_host:10.76.2.33\r + master_port:9999\r + master_link_status:up\r + master_last_io_seconds_ago:0\r + master_sync_in_progress:0\r + + SECTION + + data = ReplicationClient::Info::Replication.new(section) + + data.role.master?.should eq false + data.role.replica?.should eq true + data.master_host.should eq "10.76.2.33" + data.master_port.should eq 9999 + data.master_link_status.should eq "up" + data.master_last_io.not_nil!.should be_within 1.seconds, of: Time.utc + data.master_sync_in_progress?.should eq false + end + end + end +end diff --git a/src/client.cr b/src/client.cr index a06f6cb..f587397 100644 --- a/src/client.cr +++ b/src/client.cr @@ -1,6 +1,8 @@ require "db/pool" +require "log" require "./connection" +require "./log" module Redis # The Redis client is the expected entrypoint for this shard. By default, it will connect to localhost:6379, but you can also supply a `URI` to connect to an arbitrary Redis server. SSL, password authentication, and DB selection are all supported. @@ -27,7 +29,7 @@ module Redis # The client holds a pool of connections that expands and contracts as # needed. - def initialize(uri : URI = URI.parse(ENV.fetch("REDIS_URL", "redis:///"))) + def initialize(uri : URI = URI.parse(ENV.fetch("REDIS_URL", "redis:///")), @log = Log) # defaults as per https://github.com/crystal-lang/crystal-db/blob/v0.11.0/src/db/pool.cr initial_pool_size = uri.query_params.fetch("initial_pool_size", 1).to_i max_pool_size = uri.query_params.fetch("max_pool_size", 0).to_i @@ -46,7 +48,7 @@ module Redis retry_attempts: retry_attempts, retry_delay: retry_delay, )) do - Connection.new(uri) + Connection.new(uri, log: log) end end diff --git a/src/cluster.cr b/src/cluster.cr index 1f0f2aa..880afcb 100644 --- a/src/cluster.cr +++ b/src/cluster.cr @@ -1,6 +1,7 @@ # require "./client" require "./connection" require "./commands" +require "./read_only_commands" require "db/pool" require "set" @@ -214,62 +215,6 @@ module Redis each_master(&.run({"flushdb"})) end - # Add commands here to route them to read-only replicas. - private READ_ONLY_COMMANDS = %w[ - dump - echo - eval_ro - evalsha_ro - exists - expiretime - get - getbit - getrange - hexists - hget - hgetall - hkeys - hlen - hmget - hstrlen - hvals - keys - lcs - lindex - llen - lpos - lrange - mget - pttl - randomkey - scard - sdiff - sinter - sintercard - sismember - smembers - smismember - srandmember - strlen - sunion - ttl - type - xlen - xrange - xrevrange - zcard - zcount - zdiff - zinter - zlexcount - zrandmember - zrange - zrangebylex - zrangebyscore - zrank - zrevrangebylex - ].to_set - def run(command full_command) if full_command.empty? raise ArgumentError.new("Redis commands must have at least one component") diff --git a/src/connection.cr b/src/connection.cr index d85d116..a2e5fd3 100644 --- a/src/connection.cr +++ b/src/connection.cr @@ -8,15 +8,13 @@ require "./pipeline" require "./value" require "./transaction" require "./writer" +require "./log" module Redis # The connection wraps the TCP connection to the Redis server. class Connection include Commands - # :nodoc: - LOG = ::Log.for(self) - @socket : TCPSocket | OpenSSL::SSL::Socket::Client # We receive all connection information in the URI. @@ -24,7 +22,7 @@ module Redis # SSL connections require specifying the `rediss://` scheme. # Password authentication uses the URI password. # DB selection uses the URI path. - def initialize(@uri = URI.parse("redis:///")) + def initialize(@uri = URI.parse("redis:///"), @log = Log) host = uri.host.presence || "localhost" port = uri.port || 6379 socket = TCPSocket.new(host, port) @@ -355,7 +353,7 @@ module Redis @writer.encode command flush result = read - LOG.debug &.emit "redis", command: command[0...2].join(' '), duration_ms: (Time.monotonic - start).total_milliseconds + @log.debug &.emit "redis", command: command[0...2].join(' '), duration_ms: (Time.monotonic - start).total_milliseconds return result rescue ex : IO::Error if retries > 0 diff --git a/src/log.cr b/src/log.cr new file mode 100644 index 0000000..b4a3cdc --- /dev/null +++ b/src/log.cr @@ -0,0 +1,6 @@ +require "log" + +module Redis + # Default Redis log + Log = ::Log.for(self) +end diff --git a/src/read_only_commands.cr b/src/read_only_commands.cr new file mode 100644 index 0000000..969190b --- /dev/null +++ b/src/read_only_commands.cr @@ -0,0 +1,182 @@ +module Redis + # Commands in this set are routed to replicas by `Redis::Cluster` and + # `Redis::ReplicationClient`. + # + # You can add additional commands that this shard does not yet know about + # (for example, one provided by a custom Redis module) by using the `<<` method: + # + # ``` + # Redis::READ_ONLY_COMMANDS << "mymodule.mycommand" + # ``` + READ_ONLY_COMMANDS = %w[ + bf.card + bf.debug + bf.exists + bf.info + bf.mexists + bf.scandump + bitcount + bitfield_ro + bitpos + cf.compact + cf.count + cf.debug + cf.exists + cf.info + cf.mexists + cf.scandump + cms.info + cms.query + dbsize + dump + eval_ro + evalsha_ro + exists + expiretime + fcall_ro + ft._aliasaddifnx + ft._aliasdelifx + ft._list + ft.aggregate + ft.aliasadd + ft.aliasdel + ft.aliasupdate + ft.config + ft.cursor + ft.debug + ft.dictadd + ft.dictdel + ft.dictdump + ft.explain + ft.explaincli + ft.get + ft.info + ft.mget + ft.profile + ft.search + ft.spellcheck + ft.sugget + ft.suglen + ft.syndump + ft.tagvals + geodist + geohash + geopos + georadius_ro + georadiusbymember_ro + geosearch + get + getbit + getrange + hexists + hget + hgetall + hkeys + hlen + hmget + hrandfield + hscan + hstrlen + hvals + json.arrindex + json.arrlen + json.debug + json.get + json.mget + json.objkeys + json.objlen + json.resp + json.strlen + json.type + keys + lcs + lindex + llen + lolwut + lpos + lrange + mget + pexpiretime + pfcount + pttl + randomkey + redisgears_2.clusterset + redisgears_2.clustersetfromshard + redisgears_2.forceshardsconnection + redisgears_2.hello + redisgears_2.infocluster + redisgears_2.innercommunication + redisgears_2.networktest + redisgears_2.refreshcluster + scan + scard + sdiff + sinter + sintercard + sismember + smembers + smismember + sort_ro + srandmember + sscan + strlen + substr + sunion + tdigest.byrank + tdigest.byrevrank + tdigest.cdf + tdigest.info + tdigest.max + tdigest.min + tdigest.quantile + tdigest.rank + tdigest.revrank + tdigest.trimmed_mean + timeseries.clusterset + timeseries.clustersetfromshard + timeseries.forceshardsconnection + timeseries.hello + timeseries.infocluster + timeseries.innercommunication + timeseries.networktest + timeseries.refreshcluster + topk.info + topk.list + topk.query + touch + ts.get + ts.info + ts.mget + ts.mrange + ts.mrevrange + ts.queryindex + ts.range + ts.revrange + ttl + type + xlen + xpending + xrange + xread + xrevrange + zcard + zcount + zdiff + zinter + zintercard + zlexcount + zmscore + zrandmember + zrange + zrangebylex + zrangebyscore + zrank + zrevrange + zrevrangebylex + zrevrangebyscore + zrevrank + zscan + zscore + zunion + ].to_set +end diff --git a/src/replication_client.cr b/src/replication_client.cr new file mode 100644 index 0000000..d560614 --- /dev/null +++ b/src/replication_client.cr @@ -0,0 +1,421 @@ +require "uri" +require "set" + +require "./client" +require "./connection" +require "./read_only_commands" + +# If you're using Redis replication, you can use `ReplicationClient` to send +# read commands to replicas and reduce load on the primary. This can be important +# when your Redis primary is CPU-bound. +# +# The commands that will be routed to replicas are listed in +# `Redis::READ_ONLY_COMMANDS`. +# +# NOTE: Redis replication does not provide consistency guarantees. Every +# mechanism in Redis to improve consistency, such as +# [WAIT](https://redis.io/commands/wait/#consistency-and-wait), is best-effort, +# but not guaranteed. If you require strong consistency from Redis, stick to +# using `Redis::Client`. if you require strong consistency but your Redis primary +# is CPU-bound, you may need to either choose between consistency and performance +# or move that workload out of Redis. +# +# This client is useful for operations where strong consistency isn't typically +# needed, such as caching, full-text search with `Redis::FullText#search`, +# querying time-series data with `Redis::TimeSeries#mrange`, checking the current +# state of larger data structures without blocking the primary, etc. +# +# ## Explicitly routing commands to a primary or replica +# +# This class provides `on_primary` and `on_replica` methods to ensure your +# command is routed to the server type you want. This is useful in several +# scenarios: +# +# - you want to ensure you retrieve a value that is consistent with the state of +# the primary server — for example a value that changes frequently and you +# need the canonical state for observability purposes +# - a read-only command is routed to a primary because this client does not yet +# know about it +# - You can add commands to `Redis::READ_ONLY_COMMANDS` in one-off cases +# - Feel free to [open an issue](https://github.com/jgaskins/redis/issues) or +# [pull request](https://github.com/jgaskins/redis/pulls) to add it, as well +# +# ## Topology changes +# +# If the replication topology changes (for example, new replicas are added, +# existing ones removed, or the primary failed over), `ReplicationClient` will +# automatically pick up the changes. You can set how often it checks for these +# changes with the `topology_ttl` argument to the constructor or leave it at its +# default of 10 seconds. +@[Experimental("`ReplicationClient` is currently in alpha testing. There may be rough edges.")] +class Redis::ReplicationClient + include Commands + + Log = ::Log.for(self) + + @master : Client + @replicas : Array(Client) + @master_uri : URI + @replica_uris : Array(URI) + getter topology_ttl : Time::Span + + def self.new + new(entrypoint: URI.parse("redis:///")) + end + + # Have the `ReplicationClient` discover the master and replicas on its own + # when given the URI of a single entrypoint. The cluster topology will be + # refreshed with a max staleness of `topology_ttl`. + # + # ``` + # redis = Redis::ReplicationClient.new( + # ``` + def initialize(entrypoint : URI, topology_ttl : Time::Span = 10.seconds) + connection = Connection.new(entrypoint, log: Log.for("redis.replication_client")) + + begin + result = connection.run({"info", "replication"}).as(String) + ensure + connection.close + end + + parsed = self.class.parse_replication_section(result) + case parsed.role + in .master? + initialize( + master_uri: entrypoint, + replica_uris: parsed + .replicas + .map do |replica| + entrypoint.dup.tap do |uri| + uri.host = replica.ip + uri.port = replica.port + # TODO: Should we ignore excessively lagged replicas? + end + end + .sort_by!(&.host.not_nil!), + topology_ttl: topology_ttl, + ) + in .replica? + initialize( + entrypoint.dup.tap do |uri| + uri.host = parsed.master_host + # Dragonfly seems to report 9999 as a default port? + if parsed.master_port != 9999 + uri.port = parsed.master_port + end + end, + topology_ttl: topology_ttl, + ) + end + end + + # Initialize the client with known master and replica URIs, keeping the + # toplogy up to date with at most `topology_ttl` staleness. If you don't wish + # to keep the replication topology up to date, you can simply set + # `topology_ttl` to `0.seconds`. + def initialize(@master_uri, @replica_uris, @topology_ttl = 10.seconds) + @master = Client.new(@master_uri, log: ::Log.for("redis.primary")) + + @replicas = @replica_uris.map do |uri| + Client.new(uri, log: ::Log.for("redis.replica")) + end + + if topology_ttl > 0.seconds + spawn do + replication = self.class.parse_replication_section(@master.run({"info", "replication"}).as(String)) + + until closed? + sleep @topology_ttl + # Check topology and update if needed + new_replication = self.class.parse_replication_section(@master.run({"info", "replication"}).as(String)) + if new_replication != replication + Log.info &.emit "Topology is changed, updating Redis::ReplicationClient" + topology_ttl = @topology_ttl + # Avoid re-spawning this fiber + initialize entrypoint: @master_uri, topology_ttl: 0.seconds + @topology_ttl = topology_ttl + replication = new_replication + end + end + end + end + end + + Connection.set_return_types! + + # :nodoc: + def finalize + close + end + + # Close all connections to both the primary and all replicas. + def close + @master.close rescue nil + @replicas.each do |replica| + replica.close rescue nil + end + + @closed = true + end + + # Returns `true` if this `ReplicationClient` has been explicitly closed, + # `false` otherwise. + getter? closed = false + + protected def self.parse_replication_section(text : String) + Info::Replication.new text + end + + private module Info + struct Replication + getter role : Role + + # Master + # connected_slaves:2\r + # slave0:ip=10.76.3.39,port=6379,state=stable_sync,lag=0\r + # slave1:ip=10.76.1.130,port=6379,state=stable_sync,lag=0\r + # master_replid:b08ca5082296cf5b2c1de7207f2bc16bb8da3d80\r + getter connected_replicas = 0 + getter replicas : Array(Info::Replica) { [] of Info::Replica } + getter master_replid : String? + + # Replica + # master_host:10.76.2.33\r + # master_port:9999\r + # master_link_status:up\r + # master_last_io_seconds_ago:0\r + # master_sync_in_progress:0\r + getter master_host : String? + getter master_port : Int32? + getter master_link_status : String? + getter master_last_io : Time? + getter? master_sync_in_progress : Bool? + + def initialize(text : String) + found_role = false + role = "" + + found_connected_replicas = false + connected_replicas = 0 + + master_replid = "" + + text.each_line(chomp: true).with_index do |line, index| + next if line.starts_with? '#' + + case line + when .starts_with? "role:" + found_role = true + role = line[5..] + when .starts_with? "connected_slaves:" + @connected_replicas = line["connected_slaves:".bytesize..].to_i + @replicas = Array(Info::Replica).new(initial_capacity: connected_replicas) + when .starts_with? "slave" + if separator_index = line.index(':') + replicas << Replica.new(line[separator_index + 1..]) + else + raise ArgumentError.new("Cannot read line: #{line.inspect}") + end + when .starts_with? "master_host:" + @master_host = line["master_host:".bytesize..] + when .starts_with? "master_port:" + @master_port = line["master_port:".bytesize..].to_i + when .starts_with? "master_link_status:" + @master_link_status = line["master_link_status:".bytesize..] + when .starts_with? "master_last_io_seconds_ago:" + @master_last_io = line["master_last_io_seconds_ago:".bytesize..].to_i.seconds.ago + when .starts_with? "master_sync_in_progress:" + # No need to create a substring, we just need to check the last byte + @master_sync_in_progress = line.ends_with? '1' + end + end + + if found_role + @role = Role.parse(role) + else + raise ArgumentError.new("Missing role") + end + end + + enum Role + Master + Replica + end + end + + struct Replica + getter ip : String + getter port : Int32 + getter state : State + getter lag : Time::Span + + def initialize(text : String) + found_ip = false + ip = "" + + found_port = false + port = 0 + + found_state = false + state : State = :stable_sync + + found_lag = false + lag = 0.seconds + + token_start = 0 + parse_state = ParseState::ReadingKey + key = "" + value = "" + text.size.times do |cursor| + case text[cursor] + when '=' + parse_state = ParseState::KVSeparator + when ',' + parse_state = ParseState::EntrySeparator + end + if cursor == text.size - 1 + parse_state = ParseState::End + end + + case parse_state + in .reading_key? + in .reading_value? + in .kv_separator? + key = text[token_start...cursor] + parse_state = ParseState::ReadingValue + token_start = cursor + 1 + in .entry_separator?, .end? + value = text[token_start..(parse_state.entry_separator? ? cursor - 1 : cursor)] + parse_state = ParseState::ReadingKey + token_start = cursor + 1 + + case key + when "ip" + found_ip = true + ip = value + when "port" + found_port = true + port = value.to_i + when "state" + found_state = true + state = State.parse(value) + when "lag" + found_lag = true + lag = value.to_i.seconds + end + end + end + + if found_ip && found_port && found_state && found_lag + initialize(ip: ip, port: port, state: state, lag: lag) + else + raise ArgumentError.new("Replica info string must contain ip, port, state, and lag. Received: #{text.inspect}.") + end + end + + def initialize(@ip, @port, @state, @lag) + end + + def ==(other : self) + ip == other.ip && port == other.port + end + + enum State + StableSync + end + + private enum ParseState + ReadingKey + ReadingValue + KVSeparator + EntrySeparator + End + end + end + end + + def run(command full_command) + if full_command.empty? + raise ArgumentError.new("Redis commands must have at least one component") + end + + if READ_ONLY_COMMANDS.includes? full_command[0].downcase + on_replica(&.run full_command) + else + @master.run full_command + end + end + + # Route one or more commands to replicas. This should rarely be necessary since + # read-only commands (which can only be executed on replicas) are automatically + # routed to replicas, but if it's a command this shard does not know about (see + # `Redis::READ_ONLY_COMMANDS`) this may be necessary. Alternatively, you can + # shovel additional commands into `Redis::READ_ONLY_COMMANDS` to avoid having to + # perform this explicit routing. + def on_replica + if @replicas.empty? + yield @master + else + yield @replicas.sample + end + end + + # Route one or more commands to the primary to avoid consistency issues arising + # from replication latency. + # + # ``` + # require "redis/replication_client" + # + # redis = Redis::ReplicationClient.new + # + # redis.incr "counter" + # value = redis.on_primary &.get("counter") + # ``` + # + # This is useful for pipelining commands or executing transactions: + # + # ``` + # redis.on_primary &.transaction do |txn| + # txn.incr "counter:#{queue}" + # txn.sadd "queues", queue + # txn.lpush "queue:#{queue}", job_data + # end + # ``` + # + # … which is shorthand for this and removes the need for nesting blocks: + # + # ``` + # redis.on_primary do |primary| + # primary.transaction do |txn| + # txn.incr "counter:#{queue}" + # txn.sadd "queues", queue + # txn.lpush "queue:#{queue}", job_data + # end + # end + # ``` + # + # If you need to route many commands to the primary without necessarily + # pipelining or opening transactions, you can omit the `&.transaction` and + # call methods directly on the primary's `Redis::Client` in the block: + # + # ``` + # redis.on_primary do |primary| + # counter = primary.incr "counter:#{queue}" + # primary.sadd "queues", queue + # end + # ``` + # + # NOTE: The object yielded to the block is a `Redis::Client`, but if you try + # to use it outside the block you may run into errors because the replication + # topology could change, in which case this `Redis::Client` might not be the + # primary anymore. + def on_primary + on_master { |redis| yield redis } + end + + # Alias of `on_primary`. + def on_master + yield @master + end +end