Skip to content

Commit decb2cf

Browse files
PavelPashovnkaradzhov
authored andcommitted
test: add E2E test infrastructure for Redis maintenance scenarios
1 parent ea4a5cd commit decb2cf

File tree

4 files changed

+386
-0
lines changed

4 files changed

+386
-0
lines changed

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { lookup } from "dns/promises";
66
import assert from "node:assert";
77
import { setTimeout } from "node:timers/promises";
88
import RedisSocket from "./socket";
9+
import diagnostics_channel from "node:diagnostics_channel";
910

1011
export const MAINTENANCE_EVENTS = {
1112
PAUSE_WRITING: "pause-writing",
@@ -21,11 +22,24 @@ const PN = {
2122
FAILED_OVER: "FAILED_OVER",
2223
};
2324

25+
export type DiagnosticsEvent = {
26+
type: string;
27+
timestamp: number;
28+
data?: Object;
29+
};
30+
2431
export const dbgMaintenance = (...args: any[]) => {
2532
if (!process.env.DEBUG_MAINTENANCE) return;
2633
return console.log("[MNT]", ...args);
2734
};
2835

36+
export const emitDiagnostics = (event: DiagnosticsEvent) => {
37+
if (!process.env.EMIT_DIAGNOSTICS) return;
38+
39+
const channel = diagnostics_channel.channel("redis.maintenance");
40+
channel.publish(event);
41+
};
42+
2943
export interface MaintenanceUpdate {
3044
relaxedCommandTimeout?: number;
3145
relaxedSocketTimeout?: number;
@@ -113,18 +127,34 @@ export default class EnterpriseMaintenanceManager {
113127
const afterSeconds = push[2];
114128
const url: string | null = push[3] ? String(push[3]) : null;
115129
dbgMaintenance("Received MOVING:", afterSeconds, url);
130+
emitDiagnostics({
131+
type: PN.MOVING,
132+
timestamp: Date.now(),
133+
data: {
134+
afterSeconds,
135+
url,
136+
},
137+
});
116138
this.#onMoving(afterSeconds, url);
117139
return true;
118140
}
119141
case PN.MIGRATING:
120142
case PN.FAILING_OVER: {
121143
dbgMaintenance("Received MIGRATING|FAILING_OVER");
144+
emitDiagnostics({
145+
type: PN.MIGRATING,
146+
timestamp: Date.now(),
147+
});
122148
this.#onMigrating();
123149
return true;
124150
}
125151
case PN.MIGRATED:
126152
case PN.FAILED_OVER: {
127153
dbgMaintenance("Received MIGRATED|FAILED_OVER");
154+
emitDiagnostics({
155+
type: PN.MIGRATED,
156+
timestamp: Date.now(),
157+
});
128158
this.#onMigrated();
129159
return true;
130160
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import { setTimeout } from "node:timers/promises";
2+
3+
export type ActionType =
4+
| "dmc_restart"
5+
| "failover"
6+
| "reshard"
7+
| "sequence_of_actions"
8+
| "network_failure"
9+
| "execute_rlutil_command"
10+
| "execute_rladmin_command"
11+
| "migrate"
12+
| "bind";
13+
14+
export interface ActionRequest {
15+
type: ActionType;
16+
parameters?: {
17+
bdb_id?: string;
18+
[key: string]: unknown;
19+
};
20+
}
21+
22+
export interface ActionStatus {
23+
status: string;
24+
error: unknown;
25+
output: string;
26+
}
27+
28+
export class FaultInjectorClient {
29+
private baseUrl: string;
30+
#fetch: typeof fetch;
31+
32+
constructor(baseUrl: string, fetchImpl: typeof fetch = fetch) {
33+
this.baseUrl = baseUrl.replace(/\/+$/, ""); // trim trailing slash
34+
this.#fetch = fetchImpl;
35+
}
36+
37+
/**
38+
* Lists all available actions.
39+
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
40+
*/
41+
public listActions<T = unknown>(): Promise<T> {
42+
return this.#request<T>("GET", "/action");
43+
}
44+
45+
/**
46+
* Triggers a specific action.
47+
* @param action The action request to trigger
48+
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
49+
*/
50+
public triggerAction<T = unknown>(action: ActionRequest): Promise<T> {
51+
return this.#request<T>("POST", "/action", action);
52+
}
53+
54+
/**
55+
* Gets the status of a specific action.
56+
* @param actionId The ID of the action to check
57+
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
58+
*/
59+
public getActionStatus<T = ActionStatus>(actionId: string): Promise<T> {
60+
return this.#request<T>("GET", `/action/${actionId}`);
61+
}
62+
63+
/**
64+
* Executes an rladmin command.
65+
* @param command The rladmin command to execute
66+
* @param bdbId Optional database ID to target
67+
* @throws {Error} When the HTTP request fails or response cannot be parsed as JSON
68+
*/
69+
public executeRladminCommand<T = unknown>(
70+
command: string,
71+
bdbId?: string
72+
): Promise<T> {
73+
const cmd = bdbId ? `rladmin -b ${bdbId} ${command}` : `rladmin ${command}`;
74+
return this.#request<T>("POST", "/rladmin", cmd);
75+
}
76+
77+
/**
78+
* Waits for an action to complete.
79+
* @param actionId The ID of the action to wait for
80+
* @param options Optional timeout and max wait time
81+
* @throws {Error} When the action does not complete within the max wait time
82+
*/
83+
public async waitForAction(
84+
actionId: string,
85+
{
86+
timeoutMs,
87+
maxWaitTimeMs,
88+
}: {
89+
timeoutMs?: number;
90+
maxWaitTimeMs?: number;
91+
} = {}
92+
): Promise<ActionStatus> {
93+
const timeout = timeoutMs || 1000;
94+
const maxWaitTime = maxWaitTimeMs || 60000;
95+
96+
const startTime = Date.now();
97+
98+
while (Date.now() - startTime < maxWaitTime) {
99+
const action = await this.getActionStatus<ActionStatus>(actionId);
100+
101+
if (["finished", "failed", "success"].includes(action.status)) {
102+
return action;
103+
}
104+
105+
await setTimeout(timeout);
106+
}
107+
108+
throw new Error(`Timeout waiting for action ${actionId}`);
109+
}
110+
111+
async #request<T>(
112+
method: string,
113+
path: string,
114+
body?: Object | string
115+
): Promise<T> {
116+
const url = `${this.baseUrl}${path}`;
117+
const headers: Record<string, string> = {
118+
"Content-Type": "application/json",
119+
};
120+
121+
let payload: string | undefined;
122+
123+
if (body) {
124+
if (typeof body === "string") {
125+
headers["Content-Type"] = "text/plain";
126+
payload = body;
127+
} else {
128+
headers["Content-Type"] = "application/json";
129+
payload = JSON.stringify(body);
130+
}
131+
}
132+
133+
const response = await this.#fetch(url, { method, headers, body: payload });
134+
135+
if (!response.ok) {
136+
try {
137+
const text = await response.text();
138+
throw new Error(`HTTP ${response.status} - ${text}`);
139+
} catch {
140+
throw new Error(`HTTP ${response.status}`);
141+
}
142+
}
143+
144+
try {
145+
return (await response.json()) as T;
146+
} catch {
147+
throw new Error(
148+
`HTTP ${response.status} - Unable to parse response as JSON`
149+
);
150+
}
151+
}
152+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import assert from "node:assert";
2+
import diagnostics_channel from "node:diagnostics_channel";
3+
import { FaultInjectorClient } from "./fault-injector-client";
4+
import {
5+
getDatabaseConfig,
6+
getDatabaseConfigFromEnv,
7+
getEnvConfig,
8+
RedisConnectionConfig,
9+
} from "./test-scenario.util";
10+
import { createClient } from "../../..";
11+
import { DiagnosticsEvent } from "../../client/enterprise-maintenance-manager";
12+
import { before } from "mocha";
13+
14+
describe("Push Notifications", () => {
15+
const diagnosticsLog: DiagnosticsEvent[] = [];
16+
17+
const onMessageHandler = (message: unknown) => {
18+
diagnosticsLog.push(message as DiagnosticsEvent);
19+
};
20+
21+
let clientConfig: RedisConnectionConfig;
22+
let client: ReturnType<typeof createClient<any, any, any, 3>>;
23+
let faultInjectorClient: FaultInjectorClient;
24+
25+
before(() => {
26+
const envConfig = getEnvConfig();
27+
const redisConfig = getDatabaseConfigFromEnv(
28+
envConfig.redisEndpointsConfigPath
29+
);
30+
31+
faultInjectorClient = new FaultInjectorClient(envConfig.faultInjectorUrl);
32+
clientConfig = getDatabaseConfig(redisConfig);
33+
});
34+
35+
beforeEach(async () => {
36+
diagnosticsLog.length = 0;
37+
diagnostics_channel.subscribe("redis.maintenance", onMessageHandler);
38+
39+
client = createClient({
40+
socket: {
41+
host: clientConfig.host,
42+
port: clientConfig.port,
43+
...(clientConfig.tls === true ? { tls: true } : {}),
44+
},
45+
password: clientConfig.password,
46+
username: clientConfig.username,
47+
RESP: 3,
48+
maintPushNotifications: "auto",
49+
maintMovingEndpointType: "external-ip",
50+
maintRelaxedCommandTimeout: 10000,
51+
maintRelaxedSocketTimeout: 10000,
52+
});
53+
54+
client.on("error", (err: Error) => {
55+
throw new Error(`Client error: ${err.message}`);
56+
});
57+
58+
await client.connect();
59+
});
60+
61+
afterEach(() => {
62+
diagnostics_channel.unsubscribe("redis.maintenance", onMessageHandler);
63+
client.destroy();
64+
});
65+
66+
it("should receive MOVING, MIGRATING, and MIGRATED push notifications", async () => {
67+
const { action_id: migrateActionId } =
68+
await faultInjectorClient.triggerAction<{ action_id: string }>({
69+
type: "migrate",
70+
parameters: {
71+
cluster_index: "0",
72+
},
73+
});
74+
75+
await faultInjectorClient.waitForAction(migrateActionId);
76+
77+
const { action_id: bindActionId } =
78+
await faultInjectorClient.triggerAction<{ action_id: string }>({
79+
type: "bind",
80+
parameters: {
81+
cluster_index: "0",
82+
bdb_id: `${clientConfig.bdbId}`,
83+
},
84+
});
85+
86+
await faultInjectorClient.waitForAction(bindActionId);
87+
88+
const pushNotificationLogs = diagnosticsLog.filter((log) => {
89+
return ["MOVING", "MIGRATING", "MIGRATED"].includes(log?.type);
90+
});
91+
92+
assert.strictEqual(pushNotificationLogs.length, 3);
93+
});
94+
});

0 commit comments

Comments
 (0)