Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 109 additions & 6 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.*;
import io.lettuce.core.array.*;
import io.lettuce.core.bf.BfInfoValue;
import io.lettuce.core.bf.BfScanDumpValue;
import io.lettuce.core.bf.arguments.BfInsertArgs;
import io.lettuce.core.bf.arguments.BfReserveArgs;
import io.lettuce.core.probabilistic.BfInfoValue;
import io.lettuce.core.probabilistic.arguments.BfInsertArgs;
import io.lettuce.core.probabilistic.arguments.BfReserveArgs;
import io.lettuce.core.probabilistic.CfInfoValue;
import io.lettuce.core.probabilistic.ScanDumpValue;
import io.lettuce.core.probabilistic.arguments.CfInsertArgs;
import io.lettuce.core.probabilistic.arguments.CfReserveArgs;
import io.lettuce.core.cluster.PipelinedRedisFuture;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser;
Expand Down Expand Up @@ -116,7 +119,7 @@ public abstract class AbstractRedisAsyncCommands<K, V> implements RedisAclAsyncC
RedisHLLAsyncCommands<K, V>, BaseRedisAsyncCommands<K, V>, RedisTransactionalAsyncCommands<K, V>,
RedisGeoAsyncCommands<K, V>, RedisClusterAsyncCommands<K, V>, RedisJsonAsyncCommands<K, V>,
RedisVectorSetAsyncCommands<K, V>, RediSearchAsyncCommands<K, V>, RedisArrayAsyncCommands<K, V>,
RedisBloomFilterAsyncCommands<K, V> {
RedisBloomFilterAsyncCommands<K, V>, RedisCuckooFilterAsyncCommands<K, V> {

private final StatefulConnection<K, V> connection;

Expand All @@ -132,6 +135,8 @@ public abstract class AbstractRedisAsyncCommands<K, V> implements RedisAclAsyncC

private final RedisBloomFilterCommandBuilder<K, V> bloomFilterCommandBuilder;

private final RedisCuckooFilterCommandBuilder<K, V> cuckooFilterCommandBuilder;

private final Supplier<JsonParser> parser;

/**
Expand All @@ -151,6 +156,7 @@ public AbstractRedisAsyncCommands(StatefulConnection<K, V> connection, RedisCode
this.searchCommandBuilder = new RediSearchCommandBuilder<>(codec);
this.arrayCommandBuilder = new RedisArrayCommandBuilder<>(codec);
this.bloomFilterCommandBuilder = new RedisBloomFilterCommandBuilder<>(codec);
this.cuckooFilterCommandBuilder = new RedisCuckooFilterCommandBuilder<>(codec);
}

/**
Expand Down Expand Up @@ -4293,8 +4299,105 @@ public RedisFuture<String> bfReserve(K key, double errorRate, long capacity, BfR
}

@Override
public RedisFuture<BfScanDumpValue> bfScanDump(K key, long iterator) {
public RedisFuture<ScanDumpValue> bfScanDump(K key, long iterator) {
return dispatch(bloomFilterCommandBuilder.bfScanDump(key, iterator));
}

// --- Redis Cuckoo Filter Commands ---

@Override
public RedisFuture<String> cfReserve(K key, long capacity) {
return dispatch(cuckooFilterCommandBuilder.cfReserve(key, capacity));
}

@Override
public RedisFuture<String> cfReserve(K key, long capacity, CfReserveArgs args) {
return dispatch(cuckooFilterCommandBuilder.cfReserve(key, capacity, args));
}

@Override
public RedisFuture<Boolean> cfAdd(K key, V value) {
return dispatch(cuckooFilterCommandBuilder.cfAdd(key, value));
}

@Override
public RedisFuture<Boolean> cfAddNx(K key, V value) {
return dispatch(cuckooFilterCommandBuilder.cfAddNx(key, value));
}

@Override
public RedisFuture<List<Boolean>> cfInsert(K key, V value) {
return dispatch(cuckooFilterCommandBuilder.cfInsert(key, value));
}

@Override
public RedisFuture<List<Boolean>> cfInsert(K key, CfInsertArgs args, V value) {
return dispatch(cuckooFilterCommandBuilder.cfInsert(key, args, value));
}

@Override
public RedisFuture<List<Boolean>> cfInsert(K key, V... values) {
return dispatch(cuckooFilterCommandBuilder.cfInsert(key, values));
}

@Override
public RedisFuture<List<Boolean>> cfInsert(K key, CfInsertArgs args, V... values) {
return dispatch(cuckooFilterCommandBuilder.cfInsert(key, args, values));
}

@Override
public RedisFuture<List<Long>> cfInsertNx(K key, V value) {
return dispatch(cuckooFilterCommandBuilder.cfInsertNx(key, value));
}

@Override
public RedisFuture<List<Long>> cfInsertNx(K key, CfInsertArgs args, V value) {
return dispatch(cuckooFilterCommandBuilder.cfInsertNx(key, args, value));
}

@Override
public RedisFuture<List<Long>> cfInsertNx(K key, V... values) {
return dispatch(cuckooFilterCommandBuilder.cfInsertNx(key, values));
}

@Override
public RedisFuture<List<Long>> cfInsertNx(K key, CfInsertArgs args, V... values) {
return dispatch(cuckooFilterCommandBuilder.cfInsertNx(key, args, values));
}

@Override
public RedisFuture<Boolean> cfExists(K key, V value) {
return dispatch(cuckooFilterCommandBuilder.cfExists(key, value));
}

@Override
public RedisFuture<List<Boolean>> cfMExists(K key, V... values) {
return dispatch(cuckooFilterCommandBuilder.cfMExists(key, values));
}

@Override
public RedisFuture<Boolean> cfDel(K key, V value) {
return dispatch(cuckooFilterCommandBuilder.cfDel(key, value));
}

@Override
public RedisFuture<Long> cfCount(K key, V value) {
return dispatch(cuckooFilterCommandBuilder.cfCount(key, value));
}

@Override
public RedisFuture<ScanDumpValue> cfScanDump(K key, long cursor) {
return dispatch(cuckooFilterCommandBuilder.cfScanDump(key, cursor));
}

@Override
public RedisFuture<String> cfLoadChunk(K key, long cursor, byte[] data) {
return dispatch(cuckooFilterCommandBuilder.cfLoadChunk(key, cursor, data));
}

@Override
public RedisFuture<CfInfoValue> cfInfo(K key) {
return dispatch(cuckooFilterCommandBuilder.cfInfo(key));
}

}
128 changes: 116 additions & 12 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.reactive.*;
import io.lettuce.core.array.*;
import io.lettuce.core.bf.BfInfoValue;
import io.lettuce.core.bf.BfScanDumpValue;
import io.lettuce.core.bf.arguments.BfInsertArgs;
import io.lettuce.core.bf.arguments.BfReserveArgs;
import io.lettuce.core.probabilistic.BfInfoValue;
import io.lettuce.core.probabilistic.arguments.BfInsertArgs;
import io.lettuce.core.probabilistic.arguments.BfReserveArgs;
import io.lettuce.core.probabilistic.CfInfoValue;
import io.lettuce.core.probabilistic.ScanDumpValue;
import io.lettuce.core.probabilistic.arguments.CfInsertArgs;
import io.lettuce.core.probabilistic.arguments.CfReserveArgs;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.cluster.models.partitions.ClusterPartitionParser;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
Expand Down Expand Up @@ -115,13 +118,14 @@
* @author dae won
* @since 4.0
*/
public abstract class AbstractRedisReactiveCommands<K, V> implements RedisAclReactiveCommands<K, V>,
RedisHashReactiveCommands<K, V>, RedisKeyReactiveCommands<K, V>, RedisStringReactiveCommands<K, V>,
RedisListReactiveCommands<K, V>, RedisSetReactiveCommands<K, V>, RedisSortedSetReactiveCommands<K, V>,
RedisScriptingReactiveCommands<K, V>, RedisServerReactiveCommands<K, V>, RedisHLLReactiveCommands<K, V>,
BaseRedisReactiveCommands<K, V>, RedisTransactionalReactiveCommands<K, V>, RedisGeoReactiveCommands<K, V>,
RedisClusterReactiveCommands<K, V>, RedisJsonReactiveCommands<K, V>, RedisVectorSetReactiveCommands<K, V>,
RediSearchReactiveCommands<K, V>, RedisArrayReactiveCommands<K, V>, RedisBloomFilterReactiveCommands<K, V> {
public abstract class AbstractRedisReactiveCommands<K, V>
implements RedisAclReactiveCommands<K, V>, RedisHashReactiveCommands<K, V>, RedisKeyReactiveCommands<K, V>,
RedisStringReactiveCommands<K, V>, RedisListReactiveCommands<K, V>, RedisSetReactiveCommands<K, V>,
RedisSortedSetReactiveCommands<K, V>, RedisScriptingReactiveCommands<K, V>, RedisServerReactiveCommands<K, V>,
RedisHLLReactiveCommands<K, V>, BaseRedisReactiveCommands<K, V>, RedisTransactionalReactiveCommands<K, V>,
RedisGeoReactiveCommands<K, V>, RedisClusterReactiveCommands<K, V>, RedisJsonReactiveCommands<K, V>,
RedisVectorSetReactiveCommands<K, V>, RediSearchReactiveCommands<K, V>, RedisArrayReactiveCommands<K, V>,
RedisBloomFilterReactiveCommands<K, V>, RedisCuckooFilterReactiveCommands<K, V> {

private final StatefulConnection<K, V> connection;

Expand All @@ -137,6 +141,8 @@ public abstract class AbstractRedisReactiveCommands<K, V> implements RedisAclRea

private final RedisBloomFilterCommandBuilder<K, V> bloomFilterCommandBuilder;

private final RedisCuckooFilterCommandBuilder<K, V> cuckooFilterCommandBuilder;

private final Supplier<JsonParser> parser;

private final ClientResources clientResources;
Expand All @@ -162,6 +168,7 @@ public AbstractRedisReactiveCommands(StatefulConnection<K, V> connection, RedisC
this.searchCommandBuilder = new RediSearchCommandBuilder<>(codec);
this.arrayCommandBuilder = new RedisArrayCommandBuilder<>(codec);
this.bloomFilterCommandBuilder = new RedisBloomFilterCommandBuilder<>(codec);
this.cuckooFilterCommandBuilder = new RedisCuckooFilterCommandBuilder<>(codec);
this.clientResources = connection.getResources();
this.tracingEnabled = clientResources.tracing().isEnabled();
}
Expand Down Expand Up @@ -4340,8 +4347,105 @@ public Mono<String> bfReserve(K key, double errorRate, long capacity, BfReserveA
}

@Override
public Mono<BfScanDumpValue> bfScanDump(K key, long iterator) {
public Mono<ScanDumpValue> bfScanDump(K key, long iterator) {
return createMono(() -> bloomFilterCommandBuilder.bfScanDump(key, iterator));
}

// --- Redis Cuckoo Filter Commands ---

@Override
public Mono<String> cfReserve(K key, long capacity) {
return createMono(() -> cuckooFilterCommandBuilder.cfReserve(key, capacity));
}

@Override
public Mono<String> cfReserve(K key, long capacity, CfReserveArgs args) {
return createMono(() -> cuckooFilterCommandBuilder.cfReserve(key, capacity, args));
}

@Override
public Mono<Boolean> cfAdd(K key, V value) {
return createMono(() -> cuckooFilterCommandBuilder.cfAdd(key, value));
}

@Override
public Mono<Boolean> cfAddNx(K key, V value) {
return createMono(() -> cuckooFilterCommandBuilder.cfAddNx(key, value));
}

@Override
public Flux<Boolean> cfInsert(K key, V value) {
return createDissolvingFlux(() -> cuckooFilterCommandBuilder.cfInsert(key, value));
}

@Override
public Flux<Boolean> cfInsert(K key, CfInsertArgs args, V value) {
return createDissolvingFlux(() -> cuckooFilterCommandBuilder.cfInsert(key, args, value));
}

@Override
public Flux<Boolean> cfInsert(K key, V... values) {
return createDissolvingFlux(() -> cuckooFilterCommandBuilder.cfInsert(key, values));
}

@Override
public Flux<Boolean> cfInsert(K key, CfInsertArgs args, V... values) {
return createDissolvingFlux(() -> cuckooFilterCommandBuilder.cfInsert(key, args, values));
}

@Override
public Flux<Long> cfInsertNx(K key, V value) {
return createDissolvingFlux(() -> cuckooFilterCommandBuilder.cfInsertNx(key, value));
}

@Override
public Flux<Long> cfInsertNx(K key, CfInsertArgs args, V value) {
return createDissolvingFlux(() -> cuckooFilterCommandBuilder.cfInsertNx(key, args, value));
}

@Override
public Flux<Long> cfInsertNx(K key, V... values) {
return createDissolvingFlux(() -> cuckooFilterCommandBuilder.cfInsertNx(key, values));
}

@Override
public Flux<Long> cfInsertNx(K key, CfInsertArgs args, V... values) {
return createDissolvingFlux(() -> cuckooFilterCommandBuilder.cfInsertNx(key, args, values));
}

@Override
public Mono<Boolean> cfExists(K key, V value) {
return createMono(() -> cuckooFilterCommandBuilder.cfExists(key, value));
}

@Override
public Flux<Boolean> cfMExists(K key, V... values) {
return createDissolvingFlux(() -> cuckooFilterCommandBuilder.cfMExists(key, values));
}

@Override
public Mono<Boolean> cfDel(K key, V value) {
return createMono(() -> cuckooFilterCommandBuilder.cfDel(key, value));
}

@Override
public Mono<Long> cfCount(K key, V value) {
return createMono(() -> cuckooFilterCommandBuilder.cfCount(key, value));
}

@Override
public Mono<ScanDumpValue> cfScanDump(K key, long cursor) {
return createMono(() -> cuckooFilterCommandBuilder.cfScanDump(key, cursor));
}

@Override
public Mono<String> cfLoadChunk(K key, long cursor, byte[] data) {
return createMono(() -> cuckooFilterCommandBuilder.cfLoadChunk(key, cursor, data));
}

@Override
public Mono<CfInfoValue> cfInfo(K key) {
return createMono(() -> cuckooFilterCommandBuilder.cfInfo(key));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

import java.util.List;

import io.lettuce.core.bf.BfInfoValue;
import io.lettuce.core.bf.BfInfoValueParser;
import io.lettuce.core.bf.BfScanDumpValue;
import io.lettuce.core.bf.BfScanDumpValueParser;
import io.lettuce.core.bf.arguments.BfInsertArgs;
import io.lettuce.core.bf.arguments.BfReserveArgs;
import io.lettuce.core.probabilistic.BfInfoValue;
import io.lettuce.core.probabilistic.BfInfoValueParser;
import io.lettuce.core.probabilistic.ScanDumpValue;
import io.lettuce.core.probabilistic.ScanDumpValueParser;
import io.lettuce.core.probabilistic.arguments.BfInsertArgs;
import io.lettuce.core.probabilistic.arguments.BfReserveArgs;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.output.*;
import io.lettuce.core.protocol.BaseRedisCommandBuilder;
Expand Down Expand Up @@ -183,12 +183,12 @@ Command<K, V, String> bfReserve(K key, double errorRate, long capacity, BfReserv
return createCommand(BF_RESERVE, new StatusOutput<>(codec), args);
}

Command<K, V, BfScanDumpValue> bfScanDump(K key, long iterator) {
Command<K, V, ScanDumpValue> bfScanDump(K key, long iterator) {
notNullKey(key);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key).add(iterator);

return createCommand(BF_SCANDUMP, new EncodedComplexOutput<>(codec, BfScanDumpValueParser.INSTANCE), args);
return createCommand(BF_SCANDUMP, new EncodedComplexOutput<>(codec, ScanDumpValueParser.INSTANCE), args);
}

}
Loading
Loading