From f811427dde705223e351d9b716232d0b494cb033 Mon Sep 17 00:00:00 2001 From: catcher92 Date: Wed, 13 May 2020 21:35:39 +0800 Subject: [PATCH 1/2] Support redis sentinel mode. --- .../provider/redis/ConnectionPool.scala | 16 +++++++++++----- .../redislabs/provider/redis/RedisConfig.scala | 8 ++++++-- .../spark/sql/redis/RedisSourceRelation.scala | 3 ++- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala index 322d8c5f..bb4c5f46 100644 --- a/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala +++ b/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala @@ -1,16 +1,17 @@ package com.redislabs.provider.redis -import redis.clients.jedis.{JedisPoolConfig, Jedis, JedisPool} +import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig, JedisSentinelPool} import redis.clients.jedis.exceptions.JedisConnectionException - import java.util.concurrent.ConcurrentHashMap +import redis.clients.jedis.util.Pool + import scala.collection.JavaConversions._ object ConnectionPool { - @transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, JedisPool] = - new ConcurrentHashMap[RedisEndpoint, JedisPool]() + @transient private lazy val pools: ConcurrentHashMap[RedisEndpoint, Pool[Jedis]] = + new ConcurrentHashMap[RedisEndpoint, Pool[Jedis]]() def connect(re: RedisEndpoint): Jedis = { val pool = pools.getOrElseUpdate(re, @@ -25,7 +26,12 @@ object ConnectionPool { poolConfig.setTimeBetweenEvictionRunsMillis(30000) poolConfig.setNumTestsPerEvictionRun(-1) - new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl) + if (null == re.master || re.master.trim.isEmpty) { + new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl) + } else { + val sentinels = re.host.split(",").map(x => x + ":" + re.port).toSet + new JedisSentinelPool(re.master.trim, sentinels, poolConfig, re.auth) + } } ) var sleepTime: Int = 4 diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index 91e2f05e..b3020375 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -24,8 +24,11 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, auth: String = null, dbNum: Int = Protocol.DEFAULT_DATABASE, timeout: Int = Protocol.DEFAULT_TIMEOUT, - ssl: Boolean = false) + ssl: Boolean = false, + master: String = null) extends Serializable { + def apply(getHost: String, getPort: Int, str: String, i: Int, value: Any, bool: Boolean, str1: String): Unit = ??? + /** * Constructor from spark config. set params with spark.redis.host, spark.redis.port, spark.redis.auth, spark.redis.db and spark.redis.ssl @@ -39,7 +42,8 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, conf.get("spark.redis.auth", null), conf.getInt("spark.redis.db", Protocol.DEFAULT_DATABASE), conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT), - conf.getBoolean("spark.redis.ssl", false) + conf.getBoolean("spark.redis.ssl", false), + conf.get("spark.redis.sentinel.master", null) ) } diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala index f2c84911..9670c371 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala @@ -41,7 +41,8 @@ class RedisSourceRelation(override val sqlContext: SQLContext, val dbNum = parameters.get("dbNum").map(_.toInt).getOrElse(Protocol.DEFAULT_DATABASE) val timeout = parameters.get("timeout").map(_.toInt).getOrElse(Protocol.DEFAULT_TIMEOUT) val ssl = parameters.get("ssl").map(_.toBoolean).getOrElse(false) - RedisEndpoint(host, port, auth, dbNum, timeout, ssl) + val master = parameters.getOrElse("master", null) + RedisEndpoint(host, port, auth, dbNum, timeout, ssl, master) } ) } From 310d4cf2edc0504d7635f326d73d3457ab534cda Mon Sep 17 00:00:00 2001 From: catcher92 Date: Fri, 15 May 2020 08:17:54 +0800 Subject: [PATCH 2/2] Support redis sentinel mode. --- .../scala/com/redislabs/provider/redis/RedisConfig.scala | 8 +++----- .../org/apache/spark/sql/redis/RedisSourceRelation.scala | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala index b3020375..dbe7eece 100644 --- a/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala +++ b/src/main/scala/com/redislabs/provider/redis/RedisConfig.scala @@ -27,8 +27,6 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST, ssl: Boolean = false, master: String = null) extends Serializable { - def apply(getHost: String, getPort: Int, str: String, i: Int, value: Any, bool: Boolean, str1: String): Unit = ??? - /** * Constructor from spark config. set params with spark.redis.host, spark.redis.port, spark.redis.auth, spark.redis.db and spark.redis.ssl @@ -258,7 +256,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { //simply re-enter this function witht he master host/port getNonClusterNodes(initialHost = new RedisEndpoint(host, port, - initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl)) + initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl, master = initialHost.master)) } else { //this is a master - take its slaves @@ -274,7 +272,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val range = nodes.length (0 until range).map(i => RedisNode(RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum, - initialHost.timeout, initialHost.ssl), + initialHost.timeout, initialHost.ssl, initialHost.master), 0, 16383, i, range)).toArray } } @@ -304,7 +302,7 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable { val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]]) val port = node.get(1).toString.toInt RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum, - initialHost.timeout, initialHost.ssl), + initialHost.timeout, initialHost.ssl, initialHost.master), sPos, ePos, i, diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala index 9670c371..d2d44356 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisSourceRelation.scala @@ -41,7 +41,7 @@ class RedisSourceRelation(override val sqlContext: SQLContext, val dbNum = parameters.get("dbNum").map(_.toInt).getOrElse(Protocol.DEFAULT_DATABASE) val timeout = parameters.get("timeout").map(_.toInt).getOrElse(Protocol.DEFAULT_TIMEOUT) val ssl = parameters.get("ssl").map(_.toBoolean).getOrElse(false) - val master = parameters.getOrElse("master", null) + val master = parameters.getOrElse("sentinel.master", null) RedisEndpoint(host, port, auth, dbNum, timeout, ssl, master) } )