diff --git a/src/main/scala/org/apache/spark/sql/redis/BinaryNXRedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/BinaryNXRedisPersistence.scala new file mode 100644 index 00000000..b76937cd --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/redis/BinaryNXRedisPersistence.scala @@ -0,0 +1,18 @@ +package org.apache.spark.sql.redis + +import java.nio.charset.StandardCharsets.UTF_8 + +import redis.clients.jedis.Pipeline +import redis.clients.jedis.params.SetParams + +class BinaryNXRedisPersistence extends BinaryRedisPersistence { + + override def save(pipeline: Pipeline, key: String, value: Array[Byte], ttl: Int): Unit = { + val keyBytes = key.getBytes(UTF_8) + val setParameters = SetParams.setParams() + .nx() + .ex(ttl) + pipeline.set(keyBytes, value, setParameters) + } + +} diff --git a/src/main/scala/org/apache/spark/sql/redis/HashNXRedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/HashNXRedisPersistence.scala new file mode 100644 index 00000000..4f83650c --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/redis/HashNXRedisPersistence.scala @@ -0,0 +1,15 @@ +package org.apache.spark.sql.redis + +import redis.clients.jedis.Pipeline + +class HashNXRedisPersistence extends HashRedisPersistence { + + override def save(pipeline: Pipeline, key: String, value: Any, ttl: Int): Unit = { + val javaValue = value.asInstanceOf[Map[String, String]] + javaValue.keySet.foreach( field => pipeline.hsetnx(key, field, javaValue(field))) + if (ttl > 0) { + pipeline.expire(key, ttl) + } + } + +} diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala index d69eef66..c712184c 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala @@ -39,7 +39,9 @@ object RedisPersistence { private val providers = Map(SqlOptionModelBinary -> new BinaryRedisPersistence(), - SqlOptionModelHash -> new HashRedisPersistence()) + SqlOptionModelHash -> new HashRedisPersistence(), + SqlOptionModelHashNoOpIfExists -> new HashNXRedisPersistence(), + SqlOptionModelBinaryNoOpIfExists -> new BinaryNXRedisPersistence()) def apply(model: String): RedisPersistence[Any] = { // use hash model by default diff --git a/src/main/scala/org/apache/spark/sql/redis/redis.scala b/src/main/scala/org/apache/spark/sql/redis/redis.scala index 82b1c1d7..ffe755b8 100644 --- a/src/main/scala/org/apache/spark/sql/redis/redis.scala +++ b/src/main/scala/org/apache/spark/sql/redis/redis.scala @@ -18,6 +18,8 @@ package object redis { val SqlOptionModel = "model" val SqlOptionModelBinary = "binary" val SqlOptionModelHash = "hash" + val SqlOptionModelHashNoOpIfExists = "hashnx" + val SqlOptionModelBinaryNoOpIfExists = "binarynx" val SqlOptionInferSchema = "infer.schema" val SqlOptionKeyColumn = "key.column" val SqlOptionTTL = "ttl"