Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
*/
package org.springframework.batch.item.redis;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
Expand All @@ -39,38 +45,100 @@
*/
public class RedisItemReader<K, V> implements ItemStreamReader<V> {

private static final int DEFAULT_BATCH_SIZE = 1;

private final RedisTemplate<K, V> redisTemplate;

private final ScanOptions scanOptions;

private final int batchSize;

private Cursor<K> cursor;

private Queue<V> valueBuffer;

private boolean bufferInitialized = false;

public RedisItemReader(RedisTemplate<K, V> redisTemplate, ScanOptions scanOptions) {
this(redisTemplate, scanOptions, DEFAULT_BATCH_SIZE);
}

public RedisItemReader(RedisTemplate<K, V> redisTemplate, ScanOptions scanOptions, int batchSize) {
Assert.notNull(redisTemplate, "redisTemplate must not be null");
Assert.notNull(scanOptions, "scanOptions must no be null");
Assert.isTrue(batchSize > 0, "batchSize must be greater than 0");

this.redisTemplate = redisTemplate;
this.scanOptions = scanOptions;
this.batchSize = batchSize;
}

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.cursor = this.redisTemplate.scan(this.scanOptions);

if (batchSize > 1) {
this.valueBuffer = new ConcurrentLinkedQueue<>();
this.bufferInitialized = true;
}
}

@Override
public V read() throws Exception {
if (batchSize == 1) {
return readSingle();
}
else {
return readBatched();
}
}

private V readSingle() throws Exception {
if (this.cursor.hasNext()) {
K nextKey = this.cursor.next();
return this.redisTemplate.opsForValue().get(nextKey);
}
else {
} else {
return null;
}
}

private V readBatched() throws Exception {
if (valueBuffer.isEmpty()) {
fillBuffer();
}
return valueBuffer.poll();
}

private void fillBuffer() {
if (!cursor.hasNext()) {
return;
}

List<K> keyBatch = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize && cursor.hasNext(); i++) {
keyBatch.add(cursor.next());
}

if (!keyBatch.isEmpty()) {
List<V> values = redisTemplate.opsForValue().multiGet(keyBatch);

if (values != null) {
values.stream()
.filter(Objects::nonNull)
.forEach(valueBuffer::offer);
}
}
}

@Override
public void close() throws ItemStreamException {
this.cursor.close();
if (this.cursor != null) {
this.cursor.close();
}

if (bufferInitialized && valueBuffer != null) {
valueBuffer.clear();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class RedisItemReaderBuilder<K, V> {

private ScanOptions scanOptions;

private int batchSize = 1;

/**
* Set the {@link RedisTemplate} to use in the reader.
* @param redisTemplate the template to use
Expand All @@ -53,12 +55,35 @@ public RedisItemReaderBuilder<K, V> scanOptions(ScanOptions scanOptions) {
return this;
}

/**
* Set the batch size for optimized Redis operations.
*
* <p>When batchSize is 1 (default), the reader operates in single-key mode
* for complete backward compatibility. When batchSize is greater than 1,
* the reader uses Redis MGET to fetch multiple keys in a single operation,
* significantly improving performance by reducing network round-trips.</p>
*
* <p>Higher batch sizes reduce network overhead but may increase memory usage.
* Consider your memory constraints when setting this value.</p>
*
* @param batchSize the number of keys to fetch in each Redis operation (must be > 0)
* @return the current builder instance for fluent chaining
* @throws IllegalArgumentException if batchSize is less than or equal to 0
*/
public RedisItemReaderBuilder<K, V> batchSize(int batchSize) {
if (batchSize <= 0) {
throw new IllegalArgumentException("Batch size must be greater than 0");
}
this.batchSize = batchSize;
return this;
}

/**
* Build a new {@link RedisItemReader}.
* @return a new item reader
*/
public RedisItemReader<K, V> build() {
return new RedisItemReader<>(this.redisTemplate, this.scanOptions);
return new RedisItemReader<>(this.redisTemplate, this.scanOptions, this.batchSize);
}

}