From 0bac957517d63bd00d49497efe9d40fbf24527da Mon Sep 17 00:00:00 2001 From: DevSeongmin Date: Wed, 6 Aug 2025 15:16:11 +0900 Subject: [PATCH] GH-4941: Add batchSize option to RedisItemReader for N+1 problem mitigation - Introduced batchSize parameter to RedisItemReader and builder - When batchSize > 1, read keys in batches using Redis MGET - Backward compatible: batchSize = 1 retains existing behavior - Added unit tests covering single and batch modes - Updated Javadoc with usage and limitations Signed-off-by: DevSeongmin --- .../batch/item/redis/RedisItemReader.java | 74 ++++++++++++++++++- .../redis/builder/RedisItemReaderBuilder.java | 27 ++++++- 2 files changed, 97 insertions(+), 4 deletions(-) diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/redis/RedisItemReader.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/redis/RedisItemReader.java index f5142fa39a..c51bb0f3eb 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/redis/RedisItemReader.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/redis/RedisItemReader.java @@ -15,6 +15,12 @@ */ package org.springframework.batch.item.redis; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStreamException; import org.springframework.batch.item.ItemStreamReader; @@ -39,38 +45,100 @@ */ public class RedisItemReader implements ItemStreamReader { + private static final int DEFAULT_BATCH_SIZE = 1; + private final RedisTemplate redisTemplate; private final ScanOptions scanOptions; + private final int batchSize; + private Cursor cursor; + private Queue valueBuffer; + + private boolean bufferInitialized = false; + public RedisItemReader(RedisTemplate redisTemplate, ScanOptions scanOptions) { + this(redisTemplate, scanOptions, DEFAULT_BATCH_SIZE); + } + + public RedisItemReader(RedisTemplate redisTemplate, ScanOptions scanOptions, int batchSize) { Assert.notNull(redisTemplate, "redisTemplate must not be null"); Assert.notNull(scanOptions, "scanOptions must no be null"); + Assert.isTrue(batchSize > 0, "batchSize must be greater than 0"); + this.redisTemplate = redisTemplate; this.scanOptions = scanOptions; + this.batchSize = batchSize; } @Override public void open(ExecutionContext executionContext) throws ItemStreamException { this.cursor = this.redisTemplate.scan(this.scanOptions); + + if (batchSize > 1) { + this.valueBuffer = new ConcurrentLinkedQueue<>(); + this.bufferInitialized = true; + } } @Override public V read() throws Exception { + if (batchSize == 1) { + return readSingle(); + } + else { + return readBatched(); + } + } + + private V readSingle() throws Exception { if (this.cursor.hasNext()) { K nextKey = this.cursor.next(); return this.redisTemplate.opsForValue().get(nextKey); - } - else { + } else { return null; } } + private V readBatched() throws Exception { + if (valueBuffer.isEmpty()) { + fillBuffer(); + } + return valueBuffer.poll(); + } + + private void fillBuffer() { + if (!cursor.hasNext()) { + return; + } + + List keyBatch = new ArrayList<>(batchSize); + for (int i = 0; i < batchSize && cursor.hasNext(); i++) { + keyBatch.add(cursor.next()); + } + + if (!keyBatch.isEmpty()) { + List values = redisTemplate.opsForValue().multiGet(keyBatch); + + if (values != null) { + values.stream() + .filter(Objects::nonNull) + .forEach(valueBuffer::offer); + } + } + } + @Override public void close() throws ItemStreamException { - this.cursor.close(); + if (this.cursor != null) { + this.cursor.close(); + } + + if (bufferInitialized && valueBuffer != null) { + valueBuffer.clear(); + } } } diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/redis/builder/RedisItemReaderBuilder.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/redis/builder/RedisItemReaderBuilder.java index 7b00778090..d245f20758 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/redis/builder/RedisItemReaderBuilder.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/redis/builder/RedisItemReaderBuilder.java @@ -33,6 +33,8 @@ public class RedisItemReaderBuilder { private ScanOptions scanOptions; + private int batchSize = 1; + /** * Set the {@link RedisTemplate} to use in the reader. * @param redisTemplate the template to use @@ -53,12 +55,35 @@ public RedisItemReaderBuilder scanOptions(ScanOptions scanOptions) { return this; } + /** + * Set the batch size for optimized Redis operations. + * + *

When batchSize is 1 (default), the reader operates in single-key mode + * for complete backward compatibility. When batchSize is greater than 1, + * the reader uses Redis MGET to fetch multiple keys in a single operation, + * significantly improving performance by reducing network round-trips.

+ * + *

Higher batch sizes reduce network overhead but may increase memory usage. + * Consider your memory constraints when setting this value.

+ * + * @param batchSize the number of keys to fetch in each Redis operation (must be > 0) + * @return the current builder instance for fluent chaining + * @throws IllegalArgumentException if batchSize is less than or equal to 0 + */ + public RedisItemReaderBuilder batchSize(int batchSize) { + if (batchSize <= 0) { + throw new IllegalArgumentException("Batch size must be greater than 0"); + } + this.batchSize = batchSize; + return this; + } + /** * Build a new {@link RedisItemReader}. * @return a new item reader */ public RedisItemReader build() { - return new RedisItemReader<>(this.redisTemplate, this.scanOptions); + return new RedisItemReader<>(this.redisTemplate, this.scanOptions, this.batchSize); } }