Skip to content

[reactor-optional] Drop 'Flux' (Reactor) dependency from RedisCredentialsProvider#3785

Open
atakavci wants to merge 9 commits into
redis:feature/reactor-optional-1from
atakavci:ali/reactorOptional-credentials-flux
Open

[reactor-optional] Drop 'Flux' (Reactor) dependency from RedisCredentialsProvider#3785
atakavci wants to merge 9 commits into
redis:feature/reactor-optional-1from
atakavci:ali/reactorOptional-credentials-flux

Conversation

@atakavci

@atakavci atakavci commented Jun 15, 2026

Copy link
Copy Markdown
Collaborator

Removes Project Reactor (Flux/Sinks/Disposable) from the streaming credentials API exposed by RedisCredentialsProvider, replacing it with a callback-based subscription model built on CompletableFuture and Consumer. The credentials-provider surface no longer references reactor-core types.

API changes

io.lettuce.core.RedisCredentialsProvider:

  • Removed: default Flux<RedisCredentials> credentials().
  • Added: default CredentialsSubscription subscribeToCredentials(Consumer<RedisCredentials> onNext, Consumer<Throwable> onError), still throwing UnsupportedOperationException when supportsStreaming() is false.
  • Added: nested CredentialsSubscription extends Closeable (non-throwing close()) as the subscription handle.
  • Javadoc notes that replay, error-replay, threading and ordering are implementation-defined.

io.lettuce.core.RedisAuthenticationHandler: subscription state moves from AtomicReference<Disposable> to AtomicReference<CredentialsSubscription>; subscribe() calls subscribeToCredentials(this::reauthenticate, this::onError) and closes any previously held subscription; unsubscribe() closes it. Internal onNext/complete helpers are dropped.

io.lettuce.core.ClientOptions: ReauthenticateBehavior javadoc updated from credentials() to subscribeToCredentials(...).

Implementation

io.lettuce.authx.TokenBasedRedisCredentialsProvider replaces Sinks.Many<RedisCredentials> with:

  • AtomicReference<CompletableFuture<RedisCredentials>> for the latest credentials (backs resolveCredentials() and subscriber replay).
  • CopyOnWriteArrayList<SimpleSubscription> for live subscribers; nested SimpleSubscription deduplicates replay vs. live delivery and self-removes on close().
  • An Executor for dispatching onNext/onError and completing the initial future.

TokenListener.onTokenRenewed updates the latest future and fans out to current subscriptions via the executor. TokenListener.onError logs, fails the pending future once (resetting the ref so later calls wait for the next success), and dispatches to current subscribers; prior errors are not replayed to later subscribers. close() sets isClosed, stops the TokenManager, fails any pending future with IllegalStateException("Credentials provider closed") and clears subscriptions; listener callbacks short-circuit when closed. resolveCredentials() returns a fresh CompletableFuture chained off the current one.

New factory overloads create(TokenAuthConfig, Executor) and create(TokenManager, Executor); the no-executor variants default to a caller-thread executor (r -> r.run()). Javadoc spells out the threading consequences and the per-subscription ordering requirement.

Documentation

docs/user-guide/connecting-redis.md: streaming-credentials example rewritten on top of subscribeToCredentials(...) with a CompletableFuture-backed latest-credentials reference and a plain listener list.

Backwards compatibility

Breaking change for any external implementer or caller of RedisCredentialsProvider#credentials(). Implementers should override subscribeToCredentials(Consumer, Consumer) (and supportsStreaming()); callers should subscribe via the consumer API and close the returned CredentialsSubscription.


Note

High Risk
This is a breaking public API change in authentication and token-renewal paths, with new threading semantics in TokenBasedRedisCredentialsProvider that can affect re-auth timing if subscribers block or use unordered executors.

Overview
Breaking API change: RedisCredentialsProvider no longer exposes Reactor types. resolveCredentials() returns CompletionStage<RedisCredentials> instead of Mono, and streaming updates move from removed credentials() to subscribeToCredentials(onNext, onError) returning a closable CredentialsSubscription.

Driver wiring: RedisAuthenticationHandler subscribes via the new callback API (replacing Flux + Disposable). ReauthenticateBehavior docs now reference subscribeToCredentials. Handshake, RedisURI, and static providers use CompletableFuture / toCompletableFuture() instead of Mono/block().

Token / Entra provider: TokenBasedRedisCredentialsProvider drops Sinks for an atomic latest-credentials future, subscriber list, and optional Executor-dispatched callbacks; new create(..., Executor) overloads document threading and ordering trade-offs.

Docs & tests: User guide sample rewritten on the callback model; tests and integration code updated accordingly. External callers/implementers of credentials() must migrate to the subscription API.

Reviewed by Cursor Bugbot for commit 2a8afaf. Bugbot is set up for automated code reviews on this repo. Configure here.

@atakavci atakavci requested review from a-TODO-rov, Copilot and ggivo June 15, 2026 14:40
@atakavci atakavci self-assigned this Jun 15, 2026
@atakavci atakavci added the type: feature A new feature label Jun 15, 2026

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit de2287f. Configure here.

CompletableFuture<RedisCredentials> previous = credentialsFutureRef.get();
if (!previous.isDone()) {
credentialsFutureRef.compareAndSet(previous, new CompletableFuture<>());
executor.execute(() -> previous.completeExceptionally(exception));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failed CAS still fails future

Medium Severity

In onError, compareAndSet on credentialsFutureRef is not checked, yet previous is always completed exceptionally on the executor. If renewal already swapped the ref, a stale pending future can be failed after a successful token update, so resolveCredentials waiters may see an error while the provider already holds fresh credentials.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit de2287f. Configure here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atakavci sounds like a problem, but not sure ...

@Override
public void close() {
provider.subscriptions.remove(this);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closed subscriptions still get callbacks

Medium Severity

SimpleSubscription.close() only removes the subscription from the list; it does not invalidate the handle. Tasks already submitted to the executor can still run dispatchOnNext or dispatchOnError, so RedisAuthenticationHandler may re-authenticate or report errors after unsubscribe() or provider close().

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit de2287f. Configure here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atakavci don't you think we need a volatile isClosed flag here ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SimpleSubscription subscription = new SimpleSubscription(this, onNext, onError);
subscriptions.add(subscription);
executor.execute(() -> subscription.replay(getReplayCandidate()));
return subscription;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subscribe after close race

Low Severity

subscribeToCredentials checks isClosed once at entry, but close() can run before the subscription is added or before replay is scheduled. A subscription can remain registered after the provider is closed and still receive a replay delivery.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit de2287f. Configure here.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR removes Project Reactor types from the public RedisCredentialsProvider API by switching credential resolution to CompletionStage and replacing the streaming Flux API with a callback-based subscription (subscribeToCredentials(…, …) returning a CredentialsSubscription). It updates core authentication/handshake code paths, the token-based credentials provider implementation, and the corresponding tests and documentation to match the new non-Reactor surface.

Changes:

  • Reworks RedisCredentialsProvider to use CompletionStage for resolveCredentials() and a consumer-based streaming API with CredentialsSubscription.
  • Updates authentication/handshake consumers (RedisAuthenticationHandler, RedisHandshake, RedisURI) and supporting providers (StaticCredentialsProvider, TokenBasedRedisCredentialsProvider) to the new model.
  • Migrates affected tests and the user guide examples away from Mono/Flux-based credential resolution and streaming.

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/test/java/io/lettuce/core/sentinel/SentinelAclIntegrationTests.java Updates sentinel ACL tests to use CompletableFuture-based credentials provider.
src/test/java/io/lettuce/core/RedisURIUnitTests.java Adds/updates tests for RedisURI credential-provider error unwrapping and switches to CompletionStage.
src/test/java/io/lettuce/core/RedisURIBuilderUnitTests.java Adjusts builder tests to verify credentials via Mono.fromCompletionStage(...).
src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java Updates test credentials provider to return CompletionStage rather than Mono.
src/test/java/io/lettuce/core/MyStreamingRedisCredentialsProvider.java Replaces Reactor-based streaming example provider with callback subscription + CompletableFuture latest-value storage.
src/test/java/io/lettuce/core/ConnectionCommandIntegrationTests.java Updates integration test to resolve credentials via CompletionStage instead of Mono.block().
src/test/java/io/lettuce/core/cluster/RedisClusterURIUtilUnitTests.java Migrates cluster URI tests to CompletionStage credential resolution.
src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java Updates custom credentials provider usage to CompletableFuture.completedFuture(...).
src/test/java/io/lettuce/authx/TokenBasedRedisCredentialsProviderTest.java Rewrites streaming tests to use the new subscription API and CompletionStage resolution.
src/test/java/io/lettuce/authx/EntraIdIntegrationTests.java Updates Entra ID renewal tests to use CredentialsSubscription and ensures cleanup via close().
src/test/java/io/lettuce/authx/DefaultAzureCredentialsIntegrationTests.java Switches token credential resolution to CompletableFuture#get with timeout.
src/main/java/io/lettuce/core/StaticCredentialsProvider.java Replaces cached Mono with a cached completed CompletableFuture.
src/main/java/io/lettuce/core/RedisURI.java Removes Reactor usage; masks credentials by synchronously joining the CompletionStage and bubbling exceptions.
src/main/java/io/lettuce/core/RedisHandshake.java Replaces Mono.toFuture() with CompletionStage.toCompletableFuture() for handshake auth.
src/main/java/io/lettuce/core/RedisCredentialsProvider.java Core public API change: CompletionStage + callback subscription model + new CredentialsSubscription.
src/main/java/io/lettuce/core/RedisAuthenticationHandler.java Replaces Reactor Disposable subscription management with CredentialsSubscription lifecycle.
src/main/java/io/lettuce/core/ClientOptions.java Updates reauthentication javadoc to reference subscribeToCredentials(...).
src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java Reimplements streaming token-based provider without Reactor; adds executor-based dispatch and subscription replay behavior.
docs/user-guide/connecting-redis.md Updates streaming credentials documentation example to the new subscription API and CompletionStage resolution.
Comments suppressed due to low confidence (1)

src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java:37

  • The new CompletableFuture import breaks the local java.* import ordering (it’s inserted between java.time.* imports and java.util.*). Re-sorting keeps imports consistent and avoids style-check failures.
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +56 to +62
return () -> {
try {
return CompletableFuture.completedFuture(supplier.get());
} catch (Exception e) {
return Futures.failed(e);
}
};
Comment on lines +115 to 121
default CompletionStage<RedisCredentials> resolveCredentials() {
try {
return CompletableFuture.completedFuture(resolveCredentialsNow());
} catch (Exception e) {
return Futures.failed(e);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atakavci maybe we should consider null as failed Future ?

Comment on lines 147 to 151
public void unsubscribe() {
Disposable subscription = credentialsSubscription.getAndSet(null);
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
CredentialsSubscription sub = credentialsSubscription.getAndSet(null);
if (sub != null) {
sub.close();
}
Comment on lines +35 to +37
@Override
public CredentialsSubscription subscribeToCredentials(Consumer<RedisCredentials> onNext, Consumer<Throwable> onError) {
Listener listener = new Listener(onNext, onError);
Comment on lines +23 to 25

import java.util.concurrent.CompletableFuture;

Comment on lines +70 to +92
}

@Override
public void close() {
provider.subscriptions.remove(this);
}

private void onCredentials(RedisCredentials credentials) {
RedisCredentials played = lastPlayed.getAndSet(credentials);
if (played != credentials) {
onNext.accept(credentials);
}
}

private void replay(RedisCredentials candidate) {
if (candidate != null && lastPlayed.compareAndSet(null, candidate)) {
onNext.accept(candidate);
}
}

private void onError(Throwable throwable) {
onError.accept(throwable);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@a-TODO-rov a-TODO-rov left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few inline comments:

Comment on lines +25 to +34
/**
* Handle to a subscription created by {@link #subscribeToCredentials(Consumer, Consumer)}. Closing the subscription stops
* the provider from delivering further credential updates to the registered consumers.
*/
interface CredentialsSubscription extends Closeable {

@Override
void close();

}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override
public void close() {
provider.subscriptions.remove(this);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atakavci don't you think we need a volatile isClosed flag here ?

@Override
public void close() {
provider.subscriptions.remove(this);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +70 to +92
}

@Override
public void close() {
provider.subscriptions.remove(this);
}

private void onCredentials(RedisCredentials credentials) {
RedisCredentials played = lastPlayed.getAndSet(credentials);
if (played != credentials) {
onNext.accept(credentials);
}
}

private void replay(RedisCredentials candidate) {
if (candidate != null && lastPlayed.compareAndSet(null, candidate)) {
onNext.accept(candidate);
}
}

private void onError(Throwable throwable) {
onError.accept(throwable);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletableFuture<RedisCredentials> previous = credentialsFutureRef.get();
if (!previous.isDone()) {
credentialsFutureRef.compareAndSet(previous, new CompletableFuture<>());
executor.execute(() -> previous.completeExceptionally(exception));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atakavci sounds like a problem, but not sure ...

Comment on lines +115 to 121
default CompletionStage<RedisCredentials> resolveCredentials() {
try {
return CompletableFuture.completedFuture(resolveCredentialsNow());
} catch (Exception e) {
return Futures.failed(e);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atakavci maybe we should consider null as failed Future ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type: feature A new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants