Skip to content

Commit 667b24d

Browse files
committed
Polish.
Closes #2466 See #2465
1 parent 03de6db commit 667b24d

File tree

7 files changed

+93
-71
lines changed

7 files changed

+93
-71
lines changed

src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.nio.ByteBuffer;
2222
import java.nio.charset.StandardCharsets;
23-
import java.time.Duration;
2423
import java.util.Arrays;
2524
import java.util.Collections;
2625
import java.util.Map;
@@ -59,6 +58,8 @@
5958
*
6059
* @author Mark Paluch
6160
* @author Christoph Strobl
61+
* @author Marcin Zielinski
62+
* @author John Blum
6263
* @since 2.2
6364
*/
6465
class DefaultReactiveStreamOperations<K, HK, HV> implements ReactiveStreamOperations<K, HK, HV> {
@@ -156,20 +157,12 @@ public Mono<RecordId> add(Record<K, ?> record) {
156157
* @see org.springframework.data.redis.core.ReactiveStreamOperations#delete(java.lang.Object, java.lang.String[])
157158
*/
158159
@Override
159-
public Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime,
160-
RecordId... recordIds) {
160+
public Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
161161

162-
return createFlux(connection -> connection.xClaim(rawKey(key), group, newOwner, minIdleTime, recordIds)
162+
return createFlux(connection -> connection.xClaim(rawKey(key), consumerGroup, newOwner, xClaimOptions)
163163
.map(this::deserializeRecord));
164164
}
165165

166-
@Override
167-
public Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions) {
168-
169-
return createFlux(
170-
connection -> connection.xClaim(rawKey(key), group, newOwner, xClaimOptions).map(this::deserializeRecord));
171-
}
172-
173166
@Override
174167
public Mono<Long> delete(K key, RecordId... recordIds) {
175168

src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package org.springframework.data.redis.core;
1717

1818
import java.nio.charset.StandardCharsets;
19-
import java.time.Duration;
2019
import java.util.ArrayList;
2120
import java.util.Arrays;
2221
import java.util.Collections;
@@ -43,6 +42,7 @@
4342
import org.springframework.data.redis.connection.stream.StreamReadOptions;
4443
import org.springframework.data.redis.hash.HashMapper;
4544
import org.springframework.data.redis.serializer.RedisSerializer;
45+
import org.springframework.data.redis.support.collections.CollectionUtils;
4646
import org.springframework.lang.Nullable;
4747
import org.springframework.util.Assert;
4848
import org.springframework.util.ClassUtils;
@@ -52,6 +52,8 @@
5252
*
5353
* @author Mark Paluch
5454
* @author Christoph Strobl
55+
* @author Marcin Zielinski
56+
* @author John Blum
5557
* @since 2.2
5658
*/
5759
class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
@@ -147,32 +149,16 @@ public RecordId add(Record<K, ?> record) {
147149
* @see org.springframework.data.redis.core.StreamOperations#delete(java.lang.Object, java.lang.String[])
148150
*/
149151
@Override
150-
public List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime,
151-
RecordId... recordIds) {
152-
byte[] rawKey = rawKey(key);
152+
public List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
153153

154-
return execute(new RecordDeserializingRedisCallback() {
154+
return CollectionUtils.nullSafeList(execute(new RecordDeserializingRedisCallback() {
155155

156156
@Nullable
157157
@Override
158158
List<ByteRecord> inRedis(RedisConnection connection) {
159-
return connection.streamCommands().xClaim(rawKey, group, newOwner, minIdleTime, recordIds);
159+
return connection.streamCommands().xClaim(rawKey(key), consumerGroup, newOwner, xClaimOptions);
160160
}
161-
});
162-
}
163-
164-
@Override
165-
public List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions) {
166-
byte[] rawKey = rawKey(key);
167-
168-
return execute(new RecordDeserializingRedisCallback() {
169-
170-
@Nullable
171-
@Override
172-
List<ByteRecord> inRedis(RedisConnection connection) {
173-
return connection.streamCommands().xClaim(rawKey, group, newOwner, xClaimOptions);
174-
}
175-
});
161+
}));
176162
}
177163

178164
@Override

src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
* @author Mark Paluch
5353
* @author Christoph Strobl
5454
* @author Dengliming
55+
* @author Marcin Zielinski
56+
* @author John Blum
5557
* @since 2.2
5658
*/
5759
public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -139,33 +141,48 @@ default Mono<RecordId> add(MapRecord<K, ? extends HK, ? extends HV> record) {
139141
Mono<RecordId> add(Record<K, ?> record);
140142

141143
/**
142-
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
143-
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
144+
* Changes the ownership of a pending message so that the new owner is the consumer specified as
145+
* the command argument.
144146
*
145-
* @param key the stream key.
146-
* @param group name of the consumer group.
147-
* @param newOwner name of the consumer claiming the message.
148-
* @param minIdleTime idle time required for a message to be claimed.
149-
* @param recordIds record IDs to be claimed
147+
* The message is claimed only if its idle time (ms) is greater than the {@link Duration minimum idle time}
148+
* specified when calling {@literal XCLAIM}.
150149
*
151-
* @return the {@link Flux} of claimed MapRecords.
150+
* @param key {@link K key} to the steam.
151+
* @param consumerGroup {@link String name} of the consumer group.
152+
* @param newOwner {@link String name} of the consumer claiming the message.
153+
* @param minIdleTime {@link Duration minimum idle time} required for a message to be claimed.
154+
* @param recordIds {@link RecordId record IDs} to be claimed.
155+
* @return {@link Flux} of claimed {@link MapRecord MapRecords}.
152156
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
157+
* @see org.springframework.data.redis.connection.stream.MapRecord
158+
* @see org.springframework.data.redis.connection.stream.RecordId
159+
* @see #claim(Object, String, String, XClaimOptions)
160+
* @see reactor.core.publisher.Flux
153161
*/
154-
Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime, RecordId... recordIds);
162+
default Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, Duration minIdleTime,
163+
RecordId... recordIds) {
164+
165+
return claim(key, consumerGroup, newOwner, XClaimOptions.minIdle(minIdleTime).ids(recordIds));
166+
}
155167

156168
/**
157-
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
158-
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
159-
*
160-
* @param key the stream key.
161-
* @param group name of the consumer group.
162-
* @param newOwner name of the consumer claiming the message.
163-
* @param xClaimOptions additional parameters for the CLAIM call.
169+
* Changes the ownership of a pending message so that the new owner is the consumer specified as
170+
* the command argument.
171+
172+
* The message is claimed only if its idle time (ms) is greater than the given {@link Duration minimum idle time}
173+
* specified when calling {@literal XCLAIM}.
164174
*
165-
* @return the {@link Flux} of claimed MapRecords.
175+
* @param key {@link K key} to the steam.
176+
* @param consumerGroup {@link String name} of the consumer group.
177+
* @param newOwner {@link String name} of the consumer claiming the message.
178+
* @param xClaimOptions additional parameters for the {@literal CLAIM} call.
179+
* @return a {@link Flux} of claimed {@link MapRecord MapRecords}.
166180
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
181+
* @see org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions
182+
* @see org.springframework.data.redis.connection.stream.MapRecord
183+
* @see reactor.core.publisher.Flux
167184
*/
168-
Flux<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions);
185+
Flux<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions);
169186

170187
/**
171188
* Removes the specified records from the stream. Returns the number of records deleted, that may be different from

src/main/java/org/springframework/data/redis/core/StreamOperations.java

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
* @author Mark Paluch
5252
* @author Christoph Strobl
5353
* @author Dengliming
54+
* @author Marcin Zielinski
55+
* @author John Blum
5456
* @since 2.2
5557
*/
5658
public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -133,33 +135,46 @@ default RecordId add(MapRecord<K, ? extends HK, ? extends HV> record) {
133135
RecordId add(Record<K, ?> record);
134136

135137
/**
136-
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
137-
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
138+
* Changes the ownership of a pending message so that the new owner is the consumer specified as
139+
* the command argument.
138140
*
139-
* @param key the stream key.
140-
* @param group name of the consumer group.
141-
* @param newOwner name of the consumer claiming the message.
142-
* @param minIdleTime idle time required for a message to be claimed.
143-
* @param recordIds record IDs to be claimed
141+
* The message is claimed only if its idle time (ms) is greater than the given {@link Duration minimum idle time}
142+
* specified when calling {@literal XCLAIM}.
144143
*
145-
* @return list of claimed MapRecords.
144+
* @param key {@link K key} to the steam.
145+
* @param consumerGroup {@link String name} of the consumer group.
146+
* @param newOwner {@link String name} of the consumer claiming the message.
147+
* @param minIdleTime {@link Duration minimum idle time} required for a message to be claimed.
148+
* @param recordIds {@link RecordId record IDs} to be claimed.
149+
* @return {@link List} of claimed {@link MapRecord MapRecords}.
146150
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
151+
* @see org.springframework.data.redis.connection.stream.MapRecord
152+
* @see org.springframework.data.redis.connection.stream.RecordId
153+
* @see #claim(Object, String, String, XClaimOptions)
147154
*/
148-
List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, Duration minIdleTime, RecordId... recordIds);
155+
default List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, Duration minIdleTime,
156+
RecordId... recordIds) {
157+
158+
return claim(key, consumerGroup, newOwner, XClaimOptions.minIdle(minIdleTime).ids(recordIds));
159+
}
149160

150161
/**
151-
* Changes the ownership of a pending message, so that the new owner is the consumer specified as the command argument.
152-
* The message is claimed only if its idle time is greater the minimum idle time specified when calling XCLAIM
162+
* Changes the ownership of a pending message so that the new owner is the consumer specified as
163+
* the command argument.
153164
*
154-
* @param key the stream key.
155-
* @param group name of the consumer group.
156-
* @param newOwner name of the consumer claiming the message.
157-
* @param xClaimOptions additional parameters for the CLAIM call.
165+
* The message is claimed only if its idle time (ms) is greater than the given {@link Duration minimum idle time}
166+
* specified when calling {@literal XCLAIM}.
158167
*
159-
* @return list of claimed MapRecords.
168+
* @param key {@link K key} to the steam.
169+
* @param consumerGroup {@link String name} of the consumer group.
170+
* @param newOwner {@link String name} of the consumer claiming the message.
171+
* @param xClaimOptions additional parameters for the {@literal CLAIM} call.
172+
* @return {@link List} of claimed {@link MapRecord MapRecords}.
160173
* @see <a href="https://redis.io/commands/xclaim/">Redis Documentation: XCLAIM</a>
174+
* @see org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions
175+
* @see org.springframework.data.redis.connection.stream.MapRecord
161176
*/
162-
List<MapRecord<K, HK, HV>> claim(K key, String group, String newOwner, XClaimOptions xClaimOptions);
177+
List<MapRecord<K, HK, HV>> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions);
163178

164179
/**
165180
* Removes the specified records from the stream. Returns the number of records deleted, that may be different from

src/main/java/org/springframework/data/redis/support/collections/CollectionUtils.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818
import java.util.ArrayList;
1919
import java.util.Arrays;
2020
import java.util.Collection;
21+
import java.util.Collections;
2122
import java.util.List;
2223

2324
import org.springframework.dao.DataAccessException;
2425
import org.springframework.data.redis.core.RedisOperations;
2526
import org.springframework.data.redis.core.SessionCallback;
27+
import org.springframework.lang.NonNull;
28+
import org.springframework.lang.Nullable;
2629

2730
/**
2831
* Utility class used mainly for type conversion by the default collection implementations. Meant for internal use.
2932
*
3033
* @author Costin Leau
34+
* @author John Blum
3135
*/
32-
abstract class CollectionUtils {
36+
public abstract class CollectionUtils {
3337

3438
@SuppressWarnings("unchecked")
3539
static <E> Collection<E> reverse(Collection<? extends E> c) {
@@ -70,4 +74,9 @@ public Object execute(RedisOperations operations) throws DataAccessException {
7074
}
7175
});
7276
}
77+
78+
@NonNull
79+
public static <T> List<T> nullSafeList(@Nullable List<T> list) {
80+
return list != null ? list : Collections.emptyList();
81+
}
7382
}

src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@
5959
* Integration tests for {@link DefaultReactiveStreamOperations}.
6060
*
6161
* @author Mark Paluch
62-
* @auhtor Christoph Strobl
62+
* @author Christoph Strobl
63+
* @author Marcin Zielinski
6364
*/
6465
@MethodSource("testParams")
6566
@SuppressWarnings("unchecked")
@@ -361,7 +362,7 @@ void pendingShouldReadMessageDetails() {
361362

362363
}
363364

364-
@ParameterizedRedisTest // https://github.com/spring-projects/spring-data-redis/issues/2465
365+
@ParameterizedRedisTest // GH-2465
365366
void claimShouldReadMessageDetails() {
366367

367368
K key = keyFactory.instance();

src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
*
5151
* @author Mark Paluch
5252
* @author Christoph Strobl
53+
* @author Marcin Zielinski
5354
*/
5455
@MethodSource("testParams")
5556
@EnabledOnCommand("XADD")
@@ -413,7 +414,7 @@ void pendingShouldReadMessageDetails() {
413414
assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
414415
}
415416

416-
@ParameterizedRedisTest // https://github.com/spring-projects/spring-data-redis/issues/2465
417+
@ParameterizedRedisTest // GH-2465
417418
void claimShouldReadMessageDetails() {
418419

419420
K key = keyFactory.instance();

0 commit comments

Comments
 (0)