From 106f1215fe0c59dc6509205099ad7e6a44f79db3 Mon Sep 17 00:00:00 2001 From: vincent Date: Thu, 8 Jun 2023 10:25:00 +0800 Subject: [PATCH] feat: zRange add key:value:score --- .../com/redislabs/provider/redis/rdd/RedisRDD.scala | 12 ++++++++++++ .../redislabs/provider/redis/redisFunctions.scala | 13 +++++++++++++ 2 files changed, 25 insertions(+) diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 40417451..86ba5aae 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -157,6 +157,11 @@ class RedisZSetRDD[T: ClassTag](prev: RDD[String], nodeKeys.flatMap{k => ignoreJedisWrongTypeException(Try(conn.zrange(k, startPos, endPos))).get }.flatten.iterator + } else if (classTag[T] == classTag[(String, String, Double)]) { + nodeKeys.flatMap { k => + val a = ignoreJedisWrongTypeException(Try(conn.zrangeWithScores(k, startPos, endPos))).get + a.get.map(x => (k, x.getElement, x.getScore)) + }.iterator } else { throw new scala.Exception("Unknown RedisZSetRDD type") } @@ -330,6 +335,13 @@ class RedisKeysRDD(sc: SparkContext, new RedisZSetRDD(this, zsetContext, classOf[String], readWriteConfig) } + + def getZSetWithKey(): RDD[(String, String, Double)] = { + val zsetContext: ZSetContext = new ZSetContext(0, -1, Double.MinValue, Double.MaxValue, true, "byRange") + new RedisZSetRDD(this, zsetContext, classOf[(String, String, Double)], readWriteConfig) + } + + /** * filter the 'zset' type keys and get all the elements(with scores) of them * diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index ccf608ff..223ca12f 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -136,6 +136,19 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { } } + def fromRedisZSetWithKey[T](keysOrKeyPattern: T, + partitionNum: Int = 3) + (implicit + redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf), + readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)): + RDD[(String, String, Double)] = { + keysOrKeyPattern match { + case keyPattern: String => fromRedisKeyPattern(keyPattern, partitionNum).getZSetWithKey() + case keys: Array[String] => fromRedisKeys(keys, partitionNum).getZSetWithKey() + case _ => throw new scala.Exception(IncorrectKeysOrKeyPatternMsg) + } + } + /** * @param keysOrKeyPattern an array of keys or a key pattern * @param partitionNum number of partitions