diff --git a/.gitignore b/.gitignore
index 52ebc5d2..d18a4343 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,3 +32,7 @@ src_managed/
project/boot/
project/plugins/project/
build/*.jar
+
+.DS_Store
+
+scalastyle-output.xml
diff --git a/.travis.yml b/.travis.yml
index d748c887..ccfd143f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,7 +3,7 @@ language: scala
scala:
- 2.10.4
before_install:
- - git clone https://github.com/antirez/redis.git redis_for_spark-redis_test || true
+ - git clone --branch 3.0.0 https://github.com/antirez/redis.git redis_for_spark-redis_test || true
install:
- make -C redis_for_spark-redis_test -j4
script: make test
diff --git a/pom.xml b/pom.xml
index da2d0c05..6c8a8550 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.redislabs
spark-redis
- 0.3.2
+ 0.3.4-SNAPSHOT
Spark-Redis
A Spark library for Redis
http://github.com/RedisLabs/spark-redis
diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala
index 6ba23f6e..de8bab94 100644
--- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala
+++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala
@@ -65,6 +65,32 @@ class RedisKVRDD(prev: RDD[String],
}
}
+class RedisKMapRDD(prev: RDD[String]) extends RDD[(String, Map[String, String])](prev) with Keys {
+
+ override def getPartitions: Array[Partition] = prev.partitions
+
+ override def compute(split: Partition,
+ context: TaskContext): Iterator[(String, Map[String, String])] = {
+ val partition: RedisPartition = split.asInstanceOf[RedisPartition]
+ val sPos = partition.slots._1
+ val ePos = partition.slots._2
+ val nodes = partition.redisConfig.getNodesBySlots(sPos, ePos)
+ val keys = firstParent[String].iterator(split, context)
+ groupKeysByNode(nodes, keys).flatMap {
+ x => {
+ val conn = x._1.endpoint.connect()
+ val hashKeys = filterKeysByType(conn, x._2, "hash")
+ val res = hashKeys.map(x => {
+ val m: Map[String, String] = conn.hgetAll(x).toMap
+ (x, m)
+ }).iterator
+ conn.close
+ res
+ }
+ }.iterator
+ }
+}
+
class RedisListRDD(prev: RDD[String], val rddType: String) extends RDD[String](prev) with Keys {
override def getPartitions: Array[Partition] = prev.partitions
@@ -300,6 +326,10 @@ class RedisKeysRDD(sc: SparkContext,
def getHash(): RDD[(String, String)] = {
new RedisKVRDD(this, "hash")
}
+
+ def getHashX(): RDD[(String, Map[String, String])] = {
+ new RedisKMapRDD(this)
+ }
/**
* filter the 'zset' type keys and get all the elements(without scores) of them
* @return RedisZSetRDD[String]
diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala
index 0a3de12d..5739662c 100644
--- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala
+++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala
@@ -105,6 +105,13 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
}
}
+ def fromRedisHashX(keys: Array[String],
+ partitionNum: Int = 3)
+ (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))):
+ RDD[(String, Map[String, String])] = {
+ fromRedisKeys(keys.toSet.toArray, partitionNum)(redisConfig).getHashX
+ }
+
/**
* @param keysOrKeyPattern an array of keys or a key pattern
* @param partitionNum number of partitions
@@ -236,6 +243,25 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
kvs.foreachPartition(partition => setHash(hashName, partition, ttl, redisConfig))
}
+ /* def toRedisHASHIncrBy(kvs: RDD[(String, Long)], hashName: String, ttl: Int = 0)
+ (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = {
+ kvs.foreachPartition(partition => incrHash(hashName, partition, ttl, redisConfig))
+ }
+
+ def toRedisHASHIncrByFloat(kvs: RDD[(String, Double)], hashName: String, ttl: Int = 0)
+ (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = {
+ kvs.foreachPartition(partition => incrFloatHash(hashName, partition, ttl, redisConfig))
+ }*/
+
+ def toRedisHASHIncrByLong(keyMap: RDD[(String, Map[String, Long])], ttl: Int = 0)
+ (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = {
+ keyMap.foreachPartition(partition => incrHash(partition, ttl, redisConfig))
+ }
+
+ def toRedisHASHIncrByFloat(keyMap: RDD[(String, Map[String, Double])], ttl: Int = 0)
+ (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = {
+ keyMap.foreachPartition(partition => incrFloatHash(partition, ttl, redisConfig))
+ }
/**
* @param kvs Pair RDD of K/V
* @param zsetName target zset's name which hold all the kvs
@@ -322,6 +348,60 @@ object RedisContext extends Serializable {
conn.close
}
+ def incrHash(hashName: String, arr: Iterator[(String, Long)], ttl: Int, redisConfig: RedisConfig): Unit = {
+ val conn = redisConfig.connectionForKey(hashName)
+ val pipeline = conn.pipelined()
+ arr.foreach( x => pipeline.hincrBy(hashName, x._1, x._2))
+ if (ttl > 0) pipeline.expire(hashName, ttl)
+ pipeline.sync()
+ conn.close()
+ }
+
+ def incrHash(arr: Iterator[(String, Map[String, Long])], ttl: Int, redisConfig: RedisConfig): Unit = {
+ arr.map(kv => (redisConfig.getHost(kv._1), kv)).toArray.groupBy(_._1).
+ mapValues(a => a.map(p => p._2)).foreach {
+ x => {
+ val conn = x._1.endpoint.connect()
+ val pipeline = conn.pipelined
+ x._2.foreach(x => {
+ x._2.foreach(map => {
+ pipeline.hincrBy(x._1, map._1, map._2)
+ })
+ if (ttl > 0) pipeline.expire(x._1, ttl)
+ })
+ pipeline.sync
+ conn.close
+ }
+ }
+ }
+
+ def incrFloatHash(hashName: String, arr: Iterator[(String, Double)], ttl: Int, redisConfig: RedisConfig): Unit = {
+ val conn = redisConfig.connectionForKey(hashName)
+ val pipeline = conn.pipelined()
+ arr.foreach( x => pipeline.hincrByFloat(hashName, x._1, x._2))
+ if (ttl > 0) pipeline.expire(hashName, ttl)
+ pipeline.sync()
+ conn.close()
+ }
+
+ def incrFloatHash(arr: Iterator[(String, Map[String, Double])], ttl: Int, redisConfig: RedisConfig): Unit = {
+ arr.map(kv => (redisConfig.getHost(kv._1), kv)).toArray.groupBy(_._1).
+ mapValues(a => a.map(p => p._2)).foreach {
+ x => {
+ val conn = x._1.endpoint.connect()
+ val pipeline = conn.pipelined
+ x._2.foreach(x => {
+ x._2.foreach(map => {
+ pipeline.hincrByFloat(x._1, map._1, map._2)
+ })
+ if (ttl > 0) pipeline.expire(x._1, ttl)
+ })
+ pipeline.sync
+ conn.close
+ }
+ }
+ }
+
/**
* @param zsetName
* @param arr k/vs which should be saved in the target host
diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala
index d439a043..ecaa1d0a 100644
--- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala
+++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala
@@ -19,8 +19,25 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with
getLines.toArray.mkString("\n")
- val wcnts = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)).map((_, 1)).
- reduceByKey(_ + _).map(x => (x._1, x._2.toString))
+ val wcnt = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)).map((_, 1)).
+ reduceByKey(_ + _)
+ val wcntl = wcnt.map(x => (x._1, x._2.toLong * 2))
+ val wcntd = wcnt.map(x => (x._1, x._2.toDouble + 0.31))
+ val wcnts = wcnt.map(x => (x._1, x._2.toString))
+
+ val wcntl2 = wcntl.map(kv => {
+ val m = Map{kv._1 -> kv._2}
+ ("all:words:cnt:hash:incr:long:2", m)
+ }).reduceByKey((v1, v2) => {
+ v1 ++ v2
+ })
+
+ val wcntd2 = wcntd.map(kv => {
+ val m = Map{kv._1 -> kv._2}
+ ("all:words:cnt:hash:incr:float:2", m)
+ }).reduceByKey((v1, v2) => {
+ v1 ++ v2
+ })
val wds = sc.parallelize(content.split("\\W+").filter(!_.isEmpty))
@@ -39,6 +56,13 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with
sc.toRedisHASH(wcnts, "all:words:cnt:hash")(redisConfig)
sc.toRedisLIST(wds, "all:words:list" )(redisConfig)
sc.toRedisSET(wds, "all:words:set")(redisConfig)
+ // sc.toRedisHASHIncrBy(wcntl, "all:words:cnt:hash:incr:long")(redisConfig)
+ // sc.toRedisHASHIncrByFloat(wcntd, "all:words:cnt:hash:incr:float")(redisConfig)
+
+ sc.toRedisHASHIncrByLong(wcntl2)(redisConfig)
+ sc.toRedisHASHIncrByLong(wcntl2)(redisConfig)
+ sc.toRedisHASHIncrByFloat(wcntd2)(redisConfig)
+ sc.toRedisHASHIncrByFloat(wcntd2)(redisConfig)
}
test("RedisKVRDD - default(cluster)") {
@@ -136,6 +160,96 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with
hashContents should be (wcnts)
}
+ /* test("RedisHashIncrByLongRDD - default(cluster)") {
+ val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:long")
+ val hashContents = redisHashRDD.sortByKey().collect
+ val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
+ map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1)
+ hashContents should be (wcntl)
+ }*/
+
+ test("RedisHashIncrByLongRDD - key map - default(cluster)") {
+ val redisHashRDD = sc.fromRedisHash("all:words:cnt:hash:incr:long:2")
+ val hashContents = redisHashRDD.sortByKey().collect
+ val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
+ map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1)
+ hashContents should be (wcntl)
+ }
+
+ test("RedisHashIncrByLongRDD - key map - cluster") {
+ implicit val c: RedisConfig = redisConfig
+ val redisHashRDD = sc.fromRedisHash("all:words:cnt:hash:incr:long:2")
+ val hashContents = redisHashRDD.sortByKey().collect
+ val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
+ map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1)
+ hashContents should be (wcntl)
+ }
+
+ test("RedisHashIncrByLongRDD - key map X - default(cluster)") {
+ val redisHashRDD = sc.fromRedisHashX(Array("all:words:cnt:hash:incr:long:2",
+ "all:words:cnt:hash:incr:long:2"))
+ val hashContents = redisHashRDD.collect
+ val hashValue = hashContents(0)._2.toArray.sortBy(_._1)
+ val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
+ map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1)
+ hashValue should be (wcntl)
+ }
+
+ test("RedisHashIncrByLongRDD - key map X - cluster") {
+ implicit val c: RedisConfig = redisConfig
+ val redisHashRDD = sc.fromRedisHashX(Array("all:words:cnt:hash:incr:long:2",
+ "all:words:cnt:hash:incr:long:2"))
+ val hashContents = redisHashRDD.collect
+ val hashValue = hashContents(0)._2.toArray.sortBy(_._1)
+ val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
+ map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1)
+ hashValue should be (wcntl)
+ }
+
+ test("RedisHashIncrByRDD - X - default(cluster)") {
+ val redisHashRDD = sc.fromRedisHashX(Array("all:words:cnt:hash:incr:long:2",
+ "all:words:cnt:hash:incr:float:2"))
+ val hashContents = redisHashRDD.collect.foreach(x => {
+ if (x._1.equals("all:words:cnt:hash:incr:long:2")) {
+ val hashValue = x._2.toArray.sortBy(_._1)
+ val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
+ map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1)
+ hashValue should be (wcntl)
+ } else if (x._1.equals("all:words:cnt:hash:incr:float:2")) {
+ val hashValue = x._2.toArray.sortBy(_._1)
+ val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
+ map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2 + 0.62).toString)).toArray.sortBy(_._1)
+ hashValue should be (wcntd)
+ }
+ })
+ }
+
+ /* test("RedisHashIncrByLongRDD - cluster") {
+ implicit val c: RedisConfig = redisConfig
+ val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:long")
+ val hashContents = redisHashRDD.sortByKey().collect
+ val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
+ map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1)
+ hashContents should be (wcntl)
+ }*/
+
+ /* test("RedisHashIncrByFloatRDD - default(cluster)") {
+ val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:float")
+ val hashContents = redisHashRDD.sortByKey().collect
+ val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
+ map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 0.31).toString)).toArray.sortBy(_._1)
+ hashContents should be (wcntd)
+ }
+
+ test("RedisHashIncrByFloatRDD - cluster") {
+ implicit val c: RedisConfig = redisConfig
+ val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:float")
+ val hashContents = redisHashRDD.sortByKey().collect
+ val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
+ map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 0.31).toString)).toArray.sortBy(_._1)
+ hashContents should be (wcntd)
+ }*/
+
test("RedisListRDD - default(cluster)") {
val redisListRDD = sc.fromRedisList( "all:words:list")
val listContents = redisListRDD.sortBy(x => x).collect