diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 40417451..86ba5aae 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -157,6 +157,11 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], nodeKeys.flatMap{k => ignoreJedisWrongTypeException(Try(conn.zrange(k, startPos, endPos))).get }.flatten.iterator + } else if (classTag[T] == classTag[(String, String, Double)]) { + nodeKeys.flatMap { k => + val a = ignoreJedisWrongTypeException(Try(conn.zrangeWithScores(k, startPos, endPos))).get + a.get.map(x => (k, x.getElement, x.getScore)) + }.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") } @@ -330,6 +335,13 @@ class RedisKeysRDD(sc: SparkContext, new RedisZSetRDD(this, zsetContext, classOf[String], readWriteConfig) } + + def getZSetWithKey(): RDD[(String, String, Double)] = { + val zsetContext: ZSetContext = new ZSetContext(0, -1, Double.MinValue, Double.MaxValue, true, "byRange") + new RedisZSetRDD(this, zsetContext, classOf[(String, String, Double)], readWriteConfig) + } + + /** * filter the 'zset' type keys and get all the elements(with scores) of them * diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index ccf608ff..223ca12f 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -136,6 +136,19 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { } } + def fromRedisZSetWithKey[T](keysOrKeyPattern: T, + partitionNum: Int = 3) + (implicit + redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf), + readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)): + RDD[(String, String, Double)] = { + keysOrKeyPattern match { + case keyPattern: String => fromRedisKeyPattern(keyPattern, partitionNum).getZSetWithKey() + case keys: Array[String] => fromRedisKeys(keys, partitionNum).getZSetWithKey() + case _ => throw new scala.Exception(IncorrectKeysOrKeyPatternMsg) + } + } + /** * @param keysOrKeyPattern an array of keys or a key pattern * @param partitionNum number of partitions