Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
67bf999
feat(errors): Add specialized timeout error types for maintenance sce…
nkaradzhov Aug 14, 2025
4698eee
feat(linked-list): Add EmptyAwareSinglyLinkedList and enhance DoublyL…
nkaradzhov Aug 14, 2025
34ba9fb
refactor(commands-queue): Improve push notification handling
nkaradzhov Aug 14, 2025
5fe7597
feat(commands-queue): Add method to wait for in-flight commands to co…
nkaradzhov Aug 14, 2025
14306ff
feat(commands-queue): Introduce maintenance mode support for commands…
nkaradzhov Aug 14, 2025
01cc913
refator(client): Extract socket event listener setup into helper method
nkaradzhov Aug 14, 2025
dd4b4c2
refactor(socket): Add maintenance mode support and dynamic timeout ha…
nkaradzhov Aug 14, 2025
0af1c1d
feat(client): Add Redis Enterprise maintenance configuration options
nkaradzhov Aug 14, 2025
764f68b
feat(client): Add socket helpers and pause mechanism
nkaradzhov Aug 14, 2025
e47b394
feat(client): Add Redis Enterprise maintenance handling capabilities
nkaradzhov Aug 28, 2025
a23cc40
chore: various small improvements
nkaradzhov Aug 15, 2025
411d8ea
refactor(timeouts): remove redundant flag
nkaradzhov Aug 15, 2025
312c6d3
fix: try to schedule write upon unpausing
nkaradzhov Aug 18, 2025
ea4a5cd
fix: update url for new client if present
nkaradzhov Aug 22, 2025
decb2cf
test: add E2E test infrastructure for Redis maintenance scenarios
PavelPashov Aug 25, 2025
8042c40
refactor: improve enterprise manager push notification handling
PavelPashov Aug 26, 2025
a943428
test: add E2E tests for Redis Enterprise maintenance timeout handling…
PavelPashov Aug 29, 2025
0dfac7e
test: small fixes e2e tests (#5)
PavelPashov Aug 29, 2025
a4d99f5
refactor(test): improve e2e test infrastructure for maintenance scena…
PavelPashov Sep 1, 2025
9da4420
test: add connection handoff test
nkaradzhov Sep 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 84 additions & 27 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
import { DoublyLinkedNode, DoublyLinkedList, EmptyAwareSinglyLinkedList } from './linked-list';
import encodeCommand from '../RESP/encoder';
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { AbortError, ErrorReply, TimeoutError } from '../errors';
import { AbortError, ErrorReply, CommandTimeoutDuringMaintananceError, TimeoutError } from '../errors';
import { MonitorCallback } from '.';
import { dbgMaintenance } from './enterprise-maintenance-manager';

export interface CommandOptions<T = TypeMapping> {
chainId?: symbol;
Expand All @@ -30,6 +31,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
timeout: {
signal: AbortSignal;
listener: () => unknown;
originalTimeout: number | undefined;
} | undefined;
}

Expand All @@ -50,22 +52,74 @@ const RESP2_PUSH_TYPE_MAPPING = {
[RESP_TYPES.SIMPLE_STRING]: Buffer
};

// Try to handle a push notification. Return whether you
// successfully consumed the notification or not. This is
// important in order for the queue to be able to pass the
// notification to another handler if the current one did not
// succeed.
type PushHandler = (pushItems: Array<any>) => boolean;

export default class RedisCommandsQueue {
readonly #respVersion;
readonly #maxLength;
readonly #toWrite = new DoublyLinkedList<CommandToWrite>();
readonly #waitingForReply = new SinglyLinkedList<CommandWaitingForReply>();
readonly #waitingForReply = new EmptyAwareSinglyLinkedList<CommandWaitingForReply>();
readonly #onShardedChannelMoved;
#chainInExecution: symbol | undefined;
readonly decoder;
readonly #pubSub = new PubSub();

#pushHandlers: PushHandler[] = [this.#onPush.bind(this)];

#maintenanceCommandTimeout: number | undefined

setMaintenanceCommandTimeout(ms: number | undefined) {
// Prevent possible api misuse
if (this.#maintenanceCommandTimeout === ms) {
dbgMaintenance(`Queue already set maintenanceCommandTimeout to ${ms}, skipping`);
return;
};

dbgMaintenance(`Setting maintenance command timeout to ${ms}`);
this.#maintenanceCommandTimeout = ms;

if(this.#maintenanceCommandTimeout === undefined) {
dbgMaintenance(`Queue will keep maintenanceCommandTimeout for exisitng commands, just to be on the safe side. New commands will receive normal timeouts`);
return;
}

let counter = 0;
const total = this.#toWrite.length;

// Overwrite timeouts of all eligible toWrite commands
for(const node of this.#toWrite.nodes()) {
const command = node.value;

// Remove timeout listener if it exists
RedisCommandsQueue.#removeTimeoutListener(command)

counter++;
const newTimeout = this.#maintenanceCommandTimeout;

// Overwrite the command's timeout
const signal = AbortSignal.timeout(newTimeout);
command.timeout = {
signal,
listener: () => {
this.#toWrite.remove(node);
command.reject(new CommandTimeoutDuringMaintananceError(newTimeout));
},
originalTimeout: command.timeout?.originalTimeout
};
signal.addEventListener('abort', command.timeout.listener, { once: true });
};
dbgMaintenance(`Total of ${counter} of ${total} timeouts reset to ${ms}`);
}

get isPubSubActive() {
return this.#pubSub.isActive;
}

#invalidateCallback?: (key: RedisArgument | null) => unknown;

constructor(
respVersion: RespVersions,
maxLength: number | null | undefined,
Expand Down Expand Up @@ -107,6 +161,7 @@ export default class RedisCommandsQueue {
}
return true;
}
return false
}

#getTypeMapping() {
Expand All @@ -119,30 +174,27 @@ export default class RedisCommandsQueue {
onErrorReply: err => this.#onErrorReply(err),
//TODO: we can shave off a few cycles by not adding onPush handler at all if CSC is not used
onPush: push => {
if (!this.#onPush(push)) {
// currently only supporting "invalidate" over RESP3 push messages
switch (push[0].toString()) {
case "invalidate": {
if (this.#invalidateCallback) {
if (push[1] !== null) {
for (const key of push[1]) {
this.#invalidateCallback(key);
}
} else {
this.#invalidateCallback(null);
}
}
break;
}
}
for(const pushHandler of this.#pushHandlers) {
if(pushHandler(push)) return
}
},
getTypeMapping: () => this.#getTypeMapping()
});
}

setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
this.#invalidateCallback = callback;
addPushHandler(handler: PushHandler): void {
this.#pushHandlers.push(handler);
}

async waitForInflightCommandsToComplete(): Promise<void> {
// In-flight commands already completed
if(this.#waitingForReply.length === 0) {
return
};
// Otherwise wait for in-flight commands to fire `empty` event
return new Promise(resolve => {
this.#waitingForReply.events.on('empty', resolve)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's theoretically possible for this promise to never resolve

});
}

addCommand<T>(
Expand All @@ -168,15 +220,20 @@ export default class RedisCommandsQueue {
typeMapping: options?.typeMapping
};

const timeout = options?.timeout;
// If #maintenanceCommandTimeout was explicitly set, we should
// use it instead of the timeout provided by the command
const timeout = this.#maintenanceCommandTimeout ?? options?.timeout;
const wasInMaintenance = this.#maintenanceCommandTimeout !== undefined;
if (timeout) {

const signal = AbortSignal.timeout(timeout);
value.timeout = {
signal,
listener: () => {
this.#toWrite.remove(node);
value.reject(new TimeoutError());
}
value.reject(wasInMaintenance ? new CommandTimeoutDuringMaintananceError(timeout) : new TimeoutError());
},
originalTimeout: options?.timeout
};
signal.addEventListener('abort', value.timeout.listener, { once: true });
}
Expand Down Expand Up @@ -432,7 +489,7 @@ export default class RedisCommandsQueue {
}

static #removeTimeoutListener(command: CommandToWrite) {
command.timeout!.signal.removeEventListener('abort', command.timeout!.listener);
command.timeout?.signal.removeEventListener('abort', command.timeout!.listener);
}

static #flushToWrite(toBeSent: CommandToWrite, err: Error) {
Expand Down
Loading
Loading