From fd94ac00f3b9a3b27f3ab7847cd7fe35b1e29fb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=97=90=EC=8A=A4=EC=94=A8=EC=BB=B4=EC=A6=88=28=EC=A3=BC?= =?UTF-8?q?=29=20=EA=B9=80=ED=98=95=EB=A5=99?= Date: Wed, 28 Nov 2018 17:11:16 +0900 Subject: [PATCH] add geoRedis --- .../provider/redis/redisFunctions.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index d81a88ad..7bf7c912 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -242,6 +242,18 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { } } + /** + * @param kvs RDD of + * @param geoName target geo's name which hold all the kvs + * @param ttl time to live + */ + def toRedisGEO(kvs: RDD[(Double,Double,String)], geoName: String, ttl: Int = 0) + (implicit + redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf), + readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) { + kvs.foreachPartition(partition => setGeo(geoName, partition, ttl, redisConfig, readWriteConfig)) + } + /** * @param kvs Pair RDD of K/V * @param ttl time to live @@ -359,6 +371,23 @@ object RedisContext extends Serializable { conn.close() } + /** + * @param geoName + * @param arr k/vs which should be saved in the target host + * save all the k/vs to zsetName(zset type) to the target host + * @param ttl time to live + */ + def setGeo(geoName: String, arr: Iterator[(Double,Double,String)], ttl: Int, redisConfig: RedisConfig, readWriteConfig: ReadWriteConfig) { + implicit val rwConf: ReadWriteConfig = readWriteConfig + val conn = redisConfig.connectionForKey(geoName) + val pipeline = foreachWithPipelineNoLastSync(conn, arr) { case (pipeline, (lon,lat,mem)) => + pipeline.geoadd(geoName,lon,lat,mem) + } + if (ttl > 0) pipeline.expire(geoName, ttl) + pipeline.sync() + conn.close() + } + /** * @param zsetName * @param arr k/vs which should be saved in the target host