From 4417b0f97e56253f39d6353e42603114293afd4a Mon Sep 17 00:00:00 2001 From: Jonathan Zittritsch Date: Thu, 10 Dec 2020 11:49:28 -0500 Subject: [PATCH 1/2] initial changes --- .../sql/redis/BinaryNXRedisPersistence.scala | 18 ++++++++++ .../sql/redis/HashNXRedisPersistence.scala | 15 ++++++++ .../spark/sql/redis/RedisPersistence.scala | 36 ++++++++++--------- .../org/apache/spark/sql/redis/redis.scala | 10 +++--- 4 files changed, 58 insertions(+), 21 deletions(-) create mode 100644 src/main/scala/org/apache/spark/sql/redis/BinaryNXRedisPersistence.scala create mode 100644 src/main/scala/org/apache/spark/sql/redis/HashNXRedisPersistence.scala diff --git a/src/main/scala/org/apache/spark/sql/redis/BinaryNXRedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/BinaryNXRedisPersistence.scala new file mode 100644 index 00000000..b76937cd --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/redis/BinaryNXRedisPersistence.scala @@ -0,0 +1,18 @@ +package org.apache.spark.sql.redis + +import java.nio.charset.StandardCharsets.UTF_8 + +import redis.clients.jedis.Pipeline +import redis.clients.jedis.params.SetParams + +class BinaryNXRedisPersistence extends BinaryRedisPersistence { + + override def save(pipeline: Pipeline, key: String, value: Array[Byte], ttl: Int): Unit = { + val keyBytes = key.getBytes(UTF_8) + val setParameters = SetParams.setParams() + .nx() + .ex(ttl) + pipeline.set(keyBytes, value, setParameters) + } + +} diff --git a/src/main/scala/org/apache/spark/sql/redis/HashNXRedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/HashNXRedisPersistence.scala new file mode 100644 index 00000000..4f83650c --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/redis/HashNXRedisPersistence.scala @@ -0,0 +1,15 @@ +package org.apache.spark.sql.redis + +import redis.clients.jedis.Pipeline + +class HashNXRedisPersistence extends HashRedisPersistence { + + override def save(pipeline: Pipeline, key: String, value: Any, ttl: Int): Unit = { + val javaValue = value.asInstanceOf[Map[String, String]] + javaValue.keySet.foreach( field => pipeline.hsetnx(key, field, javaValue(field))) + if (ttl > 0) { + pipeline.expire(key, ttl) + } + } + +} diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala index d69eef66..ff60313f 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala @@ -5,8 +5,8 @@ import org.apache.spark.sql.types.StructType import redis.clients.jedis.Pipeline /** - * @author The Viet Nguyen - */ + * @author The Viet Nguyen + */ trait RedisPersistence[T] extends Serializable { def save(pipeline: Pipeline, key: String, value: T, ttl: Int): Unit @@ -14,23 +14,23 @@ trait RedisPersistence[T] extends Serializable { def load(pipeline: Pipeline, key: String, requiredColumns: Seq[String]): Unit /** - * Encode dataframe row before storing it in Redis. - * - * @param keyName field name that should be encoded in special way, e.g. in Redis keys. - * @param value row to encode. - * @return encoded row - */ + * Encode dataframe row before storing it in Redis. + * + * @param keyName field name that should be encoded in special way, e.g. in Redis keys. + * @param value row to encode. + * @return encoded row + */ def encodeRow(keyName: String, value: Row): T /** - * Decode dataframe row stored in Redis. - * - * @param keyMap extracted name/value of key column from Redis key - * @param value encoded row - * @param schema row schema - * @param requiredColumns required columns to decode - * @return decoded row - */ + * Decode dataframe row stored in Redis. + * + * @param keyMap extracted name/value of key column from Redis key + * @param value encoded row + * @param schema row schema + * @param requiredColumns required columns to decode + * @return decoded row + */ def decodeRow(keyMap: (String, String), value: T, schema: StructType, requiredColumns: Seq[String]): Row } @@ -39,7 +39,9 @@ object RedisPersistence { private val providers = Map(SqlOptionModelBinary -> new BinaryRedisPersistence(), - SqlOptionModelHash -> new HashRedisPersistence()) + SqlOptionModelHash -> new HashRedisPersistence(), + SqlOptionModelHashNoOpIfExists -> new HashNXRedisPersistence(), + SqlOptionModelBinaryNoOpIfExists -> new BinaryNXRedisPersistence()) def apply(model: String): RedisPersistence[Any] = { // use hash model by default diff --git a/src/main/scala/org/apache/spark/sql/redis/redis.scala b/src/main/scala/org/apache/spark/sql/redis/redis.scala index 82b1c1d7..fc901171 100644 --- a/src/main/scala/org/apache/spark/sql/redis/redis.scala +++ b/src/main/scala/org/apache/spark/sql/redis/redis.scala @@ -1,8 +1,8 @@ package org.apache.spark.sql /** - * @author The Viet Nguyen - */ + * @author The Viet Nguyen + */ package object redis { val RedisFormat = "org.apache.spark.sql.redis" @@ -10,14 +10,16 @@ package object redis { val SqlOptionFilterKeysByType = "filter.keys.by.type" val SqlOptionNumPartitions = "partitions.number" /** - * Default read operation number of partitions. - */ + * Default read operation number of partitions. + */ val SqlOptionNumPartitionsDefault = 3 val SqlOptionTableName = "table" val SqlOptionKeysPattern = "keys.pattern" val SqlOptionModel = "model" val SqlOptionModelBinary = "binary" val SqlOptionModelHash = "hash" + val SqlOptionModelHashNoOpIfExists = "hashnx" + val SqlOptionModelBinaryNoOpIfExists = "binarynx" val SqlOptionInferSchema = "infer.schema" val SqlOptionKeyColumn = "key.column" val SqlOptionTTL = "ttl" From 68193d25ac3cd8268bb07d63f8652378a3ae04ba Mon Sep 17 00:00:00 2001 From: Jonathan Zittritsch Date: Thu, 10 Dec 2020 11:53:15 -0500 Subject: [PATCH 2/2] format changes unnecessary --- .../spark/sql/redis/RedisPersistence.scala | 32 +++++++++---------- .../org/apache/spark/sql/redis/redis.scala | 8 ++--- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala b/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala index ff60313f..c712184c 100644 --- a/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala +++ b/src/main/scala/org/apache/spark/sql/redis/RedisPersistence.scala @@ -5,8 +5,8 @@ import org.apache.spark.sql.types.StructType import redis.clients.jedis.Pipeline /** - * @author The Viet Nguyen - */ + * @author The Viet Nguyen + */ trait RedisPersistence[T] extends Serializable { def save(pipeline: Pipeline, key: String, value: T, ttl: Int): Unit @@ -14,23 +14,23 @@ trait RedisPersistence[T] extends Serializable { def load(pipeline: Pipeline, key: String, requiredColumns: Seq[String]): Unit /** - * Encode dataframe row before storing it in Redis. - * - * @param keyName field name that should be encoded in special way, e.g. in Redis keys. - * @param value row to encode. - * @return encoded row - */ + * Encode dataframe row before storing it in Redis. + * + * @param keyName field name that should be encoded in special way, e.g. in Redis keys. + * @param value row to encode. + * @return encoded row + */ def encodeRow(keyName: String, value: Row): T /** - * Decode dataframe row stored in Redis. - * - * @param keyMap extracted name/value of key column from Redis key - * @param value encoded row - * @param schema row schema - * @param requiredColumns required columns to decode - * @return decoded row - */ + * Decode dataframe row stored in Redis. + * + * @param keyMap extracted name/value of key column from Redis key + * @param value encoded row + * @param schema row schema + * @param requiredColumns required columns to decode + * @return decoded row + */ def decodeRow(keyMap: (String, String), value: T, schema: StructType, requiredColumns: Seq[String]): Row } diff --git a/src/main/scala/org/apache/spark/sql/redis/redis.scala b/src/main/scala/org/apache/spark/sql/redis/redis.scala index fc901171..ffe755b8 100644 --- a/src/main/scala/org/apache/spark/sql/redis/redis.scala +++ b/src/main/scala/org/apache/spark/sql/redis/redis.scala @@ -1,8 +1,8 @@ package org.apache.spark.sql /** - * @author The Viet Nguyen - */ + * @author The Viet Nguyen + */ package object redis { val RedisFormat = "org.apache.spark.sql.redis" @@ -10,8 +10,8 @@ package object redis { val SqlOptionFilterKeysByType = "filter.keys.by.type" val SqlOptionNumPartitions = "partitions.number" /** - * Default read operation number of partitions. - */ + * Default read operation number of partitions. + */ val SqlOptionNumPartitionsDefault = 3 val SqlOptionTableName = "table" val SqlOptionKeysPattern = "keys.pattern"