Skip to content

Commit aca7908

Browse files
authored
Merge pull request #345 from RedisLabs/fix-328-acl-auth-v3
Fix #328 acl auth in spark 3 and update jedis, spark version
2 parents 8f6c447 + db23347 commit aca7908

23 files changed

+272
-74
lines changed

.circleci/config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ jobs:
6161
name: install Redis
6262
command: |
6363
sudo apt-get -y update
64-
sudo apt-get install -y gcc-8 g++-8 libssl-dev
64+
sudo apt-get install -y libssl-dev
6565
wget http://download.redis.io/releases/redis-6.0.10.tar.gz
6666
tar -xzvf redis-6.0.10.tar.gz
6767
make -C redis-6.0.10 -j`nproc` BUILD_TLS=yes
@@ -118,7 +118,7 @@ jobs:
118118
name: install Redis
119119
command: |
120120
sudo apt-get -y update
121-
sudo apt-get install -y gcc-8 g++-8 libssl-dev
121+
sudo apt-get install -y libssl-dev
122122
wget http://download.redis.io/releases/redis-6.0.10.tar.gz
123123
tar -xzvf redis-6.0.10.tar.gz
124124
make -C redis-6.0.10 -j`nproc` BUILD_TLS=yes

Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# user
2+
USER_ACL = user alice on >p1pp0 ~* +@all
3+
14
# STANDALONE REDIS NODE
25
define REDIS_STANDALONE_NODE_CONF
36
daemonize yes
@@ -7,6 +10,7 @@ logfile /tmp/redis_standalone_node_for_spark-redis.log
710
save ""
811
appendonly no
912
requirepass passwd
13+
$(USER_ACL)
1014
endef
1115

1216
# STANDALONE REDIS NODE WITH SSL
@@ -18,6 +22,7 @@ logfile /tmp/redis_standalone_node_ssl_for_spark-redis.log
1822
save ""
1923
appendonly no
2024
requirepass passwd
25+
$(USER_ACL)
2126
tls-auth-clients no
2227
tls-port 6380
2328
tls-cert-file ./src/test/resources/tls/redis.crt
@@ -30,6 +35,7 @@ endef
3035
define REDIS_CLUSTER_NODE1_CONF
3136
daemonize yes
3237
port 7379
38+
$(USER_ACL)
3339
pidfile /tmp/redis_cluster_node1_for_spark-redis.pid
3440
logfile /tmp/redis_cluster_node1_for_spark-redis.log
3541
save ""
@@ -41,6 +47,7 @@ endef
4147
define REDIS_CLUSTER_NODE2_CONF
4248
daemonize yes
4349
port 7380
50+
$(USER_ACL)
4451
pidfile /tmp/redis_cluster_node2_for_spark-redis.pid
4552
logfile /tmp/redis_cluster_node2_for_spark-redis.log
4653
save ""
@@ -52,6 +59,7 @@ endef
5259
define REDIS_CLUSTER_NODE3_CONF
5360
daemonize yes
5461
port 7381
62+
$(USER_ACL)
5563
pidfile /tmp/redis_cluster_node3_for_spark-redis.pid
5664
logfile /tmp/redis_cluster_node3_for_spark-redis.log
5765
save ""

README.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ Spark-Redis also supports Spark Streaming (DStreams) and Structured Streaming.
2020
The library has several branches, each corresponds to a different supported Spark version. For example, 'branch-2.3' works with any Spark 2.3.x version.
2121
The master branch contains the recent development for the next release.
2222

23-
| Spark-Redis | Spark | Redis | Supported Scala Versions |
24-
| ----------------------------------------------------------------| ------------- | ---------------- | ------------------------ |
25-
| [master](https://github.com/RedisLabs/spark-redis/) | 3.0.x | >=2.9.0 | 2.12 |
26-
| [2.4, 2.5, 2.6](https://github.com/RedisLabs/spark-redis/tree/branch-2.4) | 2.4.x | >=2.9.0 | 2.11, 2.12 |
27-
| [2.3](https://github.com/RedisLabs/spark-redis/tree/branch-2.3) | 2.3.x | >=2.9.0 | 2.11 |
28-
| [1.4](https://github.com/RedisLabs/spark-redis/tree/branch-1.4) | 1.4.x | | 2.10 |
23+
| Spark-Redis | Spark | Redis | Supported Scala Versions |
24+
|---------------------------------------------------------------------------|-------| ---------------- | ------------------------ |
25+
| [master](https://github.com/RedisLabs/spark-redis/) | 3.2.x | >=2.9.0 | 2.12 |
26+
| [3.0](https://github.com/RedisLabs/spark-redis/tree/branch-3.0) | 3.0.x | >=2.9.0 | 2.12 |
27+
| [2.4, 2.5, 2.6](https://github.com/RedisLabs/spark-redis/tree/branch-2.4) | 2.4.x | >=2.9.0 | 2.11, 2.12 |
28+
| [2.3](https://github.com/RedisLabs/spark-redis/tree/branch-2.3) | 2.3.x | >=2.9.0 | 2.11 |
29+
| [1.4](https://github.com/RedisLabs/spark-redis/tree/branch-1.4) | 1.4.x | | 2.10 |
2930

3031

3132
## Known limitations

doc/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The supported configuration parameters are:
55
* `spark.redis.host` - host or IP of the initial node we connect to. The connector will read the cluster
66
topology from the initial node, so there is no need to provide the rest of the cluster nodes.
77
* `spark.redis.port` - the initial node's TCP redis port.
8+
* `spark.redis.user` - the initial node's AUTH user
89
* `spark.redis.auth` - the initial node's AUTH password
910
* `spark.redis.db` - optional DB number. Avoid using this, especially in cluster mode.
1011
* `spark.redis.timeout` - connection timeout in ms, 2000 ms by default

doc/dataframe.md

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -330,22 +330,23 @@ root
330330

331331
## DataFrame options
332332

333-
| Name | Description | Type | Default |
334-
| -----------------------| ------------------------------------------------------------------------------------------| --------------------- | ------- |
335-
| model | defines the Redis model used to persist DataFrame, see [Persistence model](#persistence-model)| `enum [binary, hash]` | `hash` |
336-
| filter.keys.by.type | make sure the underlying data structures match persistence model | `Boolean` | `false` |
337-
| partitions.number | number of partitions (applies only when reading DataFrame) | `Int` | `3` |
333+
| Name | Description | Type | Default |
334+
|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------| --------------------- | ------- |
335+
| model | defines the Redis model used to persist DataFrame, see [Persistence model](#persistence-model) | `enum [binary, hash]` | `hash` |
336+
| filter.keys.by.type | make sure the underlying data structures match persistence model | `Boolean` | `false` |
337+
| partitions.number | number of partitions (applies only when reading DataFrame) | `Int` | `3` |
338338
| key.column | when writing - specifies unique column used as a Redis key, by default a key is auto-generated <br/> when reading - specifies column name to store hash key | `String` | - |
339-
| ttl | data time to live in `seconds`. Data doesn't expire if `ttl` is less than `1` | `Int` | `0` |
340-
| infer.schema | infer schema from random row, all columns will have `String` type | `Boolean` | `false` |
341-
| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 |
342-
| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 |
343-
| iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 |
344-
| host | overrides `spark.redis.host` configured in SparkSession | `String` | `localhost` |
345-
| port | overrides `spark.redis.port` configured in SparkSession | `Int` | `6379` |
346-
| auth | overrides `spark.redis.auth` configured in SparkSession | `String` | - |
347-
| dbNum | overrides `spark.redis.db` configured in SparkSession | `Int` | `0` |
348-
| timeout | overrides `spark.redis.timeout` configured in SparkSession | `Int` | `2000` |
339+
| ttl | data time to live in `seconds`. Data doesn't expire if `ttl` is less than `1` | `Int` | `0` |
340+
| infer.schema | infer schema from random row, all columns will have `String` type | `Boolean` | `false` |
341+
| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 |
342+
| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 |
343+
| iterator.grouping.size | the number of items to be grouped when iterating over underlying RDD partition | `Int` | 1000 |
344+
| host | overrides `spark.redis.host` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | `localhost` |
345+
| port | overrides `spark.redis.port` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `6379` |
346+
| user | overrides `spark.redis.user` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - |
347+
| auth | overrides `spark.redis.auth` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `String` | - |
348+
| dbNum | overrides `spark.redis.db` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `0` |
349+
| timeout | overrides `spark.redis.timeout` configured in SparkSession (if set, any other connection setting from SparkSession is ignored ) | `Int` | `2000` |
349350

350351

351352
## Known limitations

pom.xml

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.redislabs</groupId>
55
<artifactId>spark-redis_2.12</artifactId>
6-
<version>3.0.0-SNAPSHOT</version>
6+
<version>3.1.0-SNAPSHOT</version>
77
<name>Spark-Redis</name>
88
<description>A Spark library for Redis</description>
99
<url>http://github.com/RedisLabs/spark-redis</url>
@@ -49,8 +49,8 @@
4949
<java.version>1.8</java.version>
5050
<scala.major.version>2.12</scala.major.version>
5151
<scala.complete.version>${scala.major.version}.0</scala.complete.version>
52-
<jedis.version>3.4.1</jedis.version>
53-
<spark.version>3.0.1</spark.version>
52+
<jedis.version>3.9.0</jedis.version>
53+
<spark.version>3.2.1</spark.version>
5454
<plugins.scalatest.version>1.0</plugins.scalatest.version>
5555
</properties>
5656

@@ -271,11 +271,6 @@
271271
</plugins>
272272
</build>
273273
<dependencies>
274-
<dependency>
275-
<groupId>org.apache.commons</groupId>
276-
<artifactId>commons-pool2</artifactId>
277-
<version>2.0</version>
278-
</dependency>
279274
<dependency>
280275
<groupId>redis.clients</groupId>
281276
<artifactId>jedis</artifactId>

src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.redislabs.provider.redis
22

3-
import redis.clients.jedis.{JedisPoolConfig, Jedis, JedisPool}
43
import redis.clients.jedis.exceptions.JedisConnectionException
4+
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
55

6+
import java.time.Duration
67
import java.util.concurrent.ConcurrentHashMap
7-
88
import scala.collection.JavaConversions._
99

1010

@@ -21,11 +21,11 @@ object ConnectionPool {
2121
poolConfig.setTestOnBorrow(false)
2222
poolConfig.setTestOnReturn(false)
2323
poolConfig.setTestWhileIdle(false)
24-
poolConfig.setMinEvictableIdleTimeMillis(60000)
25-
poolConfig.setTimeBetweenEvictionRunsMillis(30000)
24+
poolConfig.setSoftMinEvictableIdleTime(Duration.ofMinutes(1))
25+
poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(30))
2626
poolConfig.setNumTestsPerEvictionRun(-1)
2727

28-
new JedisPool(poolConfig, re.host, re.port, re.timeout, re.auth, re.dbNum, re.ssl)
28+
new JedisPool(poolConfig, re.host, re.port, re.timeout, re.user, re.auth, re.dbNum, re.ssl)
2929
}
3030
)
3131
var sleepTime: Int = 4

src/main/scala/com/redislabs/provider/redis/RedisConfig.scala

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ import scala.collection.JavaConversions._
1515
*
1616
* @param host the redis host or ip
1717
* @param port the redis port
18+
* @param user the authentication username
1819
* @param auth the authentication password
1920
* @param dbNum database number (should be avoided in general)
2021
* @param ssl true to enable SSL connection. Defaults to false
2122
*/
2223
case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
2324
port: Int = Protocol.DEFAULT_PORT,
25+
user: String = null,
2426
auth: String = null,
2527
dbNum: Int = Protocol.DEFAULT_DATABASE,
2628
timeout: Int = Protocol.DEFAULT_TIMEOUT,
@@ -36,6 +38,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
3638
this(
3739
conf.get("spark.redis.host", Protocol.DEFAULT_HOST),
3840
conf.getInt("spark.redis.port", Protocol.DEFAULT_PORT),
41+
conf.get("spark.redis.user", null),
3942
conf.get("spark.redis.auth", null),
4043
conf.getInt("spark.redis.db", Protocol.DEFAULT_DATABASE),
4144
conf.getInt("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT),
@@ -54,6 +57,7 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
5457
this(
5558
parameters.getOrElse("host", conf.get("spark.redis.host", Protocol.DEFAULT_HOST)),
5659
parameters.getOrElse("port", conf.get("spark.redis.port", Protocol.DEFAULT_PORT.toString)).toInt,
60+
parameters.getOrElse("user", conf.get("spark.redis.user", null)),
5761
parameters.getOrElse("auth", conf.get("spark.redis.auth", null)),
5862
parameters.getOrElse("dbNum", conf.get("spark.redis.db", Protocol.DEFAULT_DATABASE.toString)).toInt,
5963
parameters.getOrElse("timeout", conf.get("spark.redis.timeout", Protocol.DEFAULT_TIMEOUT.toString)).toInt,
@@ -64,17 +68,17 @@ case class RedisEndpoint(host: String = Protocol.DEFAULT_HOST,
6468
/**
6569
* Constructor with Jedis URI
6670
*
67-
* @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
71+
* @param uri connection URI in the form of redis://$user:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
6872
*/
6973
def this(uri: URI) {
70-
this(uri.getHost, uri.getPort, JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri),
74+
this(uri.getHost, uri.getPort, JedisURIHelper.getUser(uri), JedisURIHelper.getPassword(uri), JedisURIHelper.getDBIndex(uri),
7175
Protocol.DEFAULT_TIMEOUT, uri.getScheme == RedisSslScheme)
7276
}
7377

7478
/**
7579
* Constructor with Jedis URI from String
7680
*
77-
* @param uri connection URI in the form of redis://:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
81+
* @param uri connection URI in the form of redis://$user:$password@$host:$port/[dbnum]. Use "rediss://" scheme for redis SSL
7882
*/
7983
def this(uri: String) {
8084
this(URI.create(uri))
@@ -280,8 +284,14 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
280284
val port = replinfo.filter(_.contains("master_port:"))(0).trim.substring(12).toInt
281285

282286
//simply re-enter this function witht he master host/port
283-
getNonClusterNodes(initialHost = new RedisEndpoint(host, port,
284-
initialHost.auth, initialHost.dbNum, ssl = initialHost.ssl))
287+
getNonClusterNodes(initialHost = RedisEndpoint(
288+
host = host,
289+
port = port,
290+
user = initialHost.user,
291+
auth = initialHost.auth,
292+
dbNum = initialHost.dbNum,
293+
ssl = initialHost.ssl
294+
))
285295

286296
} else {
287297
//this is a master - take its slaves
@@ -295,10 +305,17 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
295305

296306
val nodes = master +: slaves
297307
val range = nodes.length
298-
(0 until range).map(i =>
299-
RedisNode(RedisEndpoint(nodes(i)._1, nodes(i)._2, initialHost.auth, initialHost.dbNum,
300-
initialHost.timeout, initialHost.ssl),
301-
0, 16383, i, range)).toArray
308+
(0 until range).map(i => {
309+
val endpoint = RedisEndpoint(
310+
host = nodes(i)._1,
311+
port = nodes(i)._2,
312+
user = initialHost.user,
313+
auth = initialHost.auth,
314+
dbNum = initialHost.dbNum,
315+
timeout = initialHost.timeout,
316+
ssl = initialHost.ssl)
317+
RedisNode(endpoint, 0, 16383, i, range)
318+
}).toArray
302319
}
303320
}
304321

@@ -326,12 +343,15 @@ class RedisConfig(val initialHost: RedisEndpoint) extends Serializable {
326343
val node = slotInfo(i + 2).asInstanceOf[java.util.List[java.lang.Object]]
327344
val host = SafeEncoder.encode(node.get(0).asInstanceOf[Array[scala.Byte]])
328345
val port = node.get(1).toString.toInt
329-
RedisNode(RedisEndpoint(host, port, initialHost.auth, initialHost.dbNum,
330-
initialHost.timeout, initialHost.ssl),
331-
sPos,
332-
ePos,
333-
i,
334-
slotInfo.size - 2)
346+
val endpoint = RedisEndpoint(
347+
host = host,
348+
port = port,
349+
user = initialHost.user,
350+
auth = initialHost.auth,
351+
dbNum = initialHost.dbNum,
352+
timeout = initialHost.timeout,
353+
ssl = initialHost.ssl)
354+
RedisNode(endpoint, sPos, ePos, i, slotInfo.size - 2)
335355
})
336356
}
337357
}.toArray

0 commit comments

Comments
 (0)