Skip to content

Commit 03e7571

Browse files
committed
feat(analytics): enhance Redis key migration and health monitoring
- Added a backward compatibility helper for Redis keys to facilitate migration from old to new formats. - Improved Redis retry strategies with logging for better debugging. - Implemented health monitoring for Redis connections, including automatic client rebuilding after consecutive errors. - Updated Redis options for better connection handling and timeouts.
1 parent c4eba19 commit 03e7571

File tree

1 file changed

+122
-10
lines changed

1 file changed

+122
-10
lines changed

packages/sdk-socket-server-next/src/analytics-api.ts

Lines changed: 122 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
incrementAnalyticsError,
2525
incrementAnalyticsEvents,
2626
incrementRedisCacheOperation,
27+
incrementKeyMigration,
2728
} from './metrics';
2829
import genericPool from "generic-pool";
2930

@@ -72,10 +73,21 @@ export const getRedisOptions = (
7273
connectTimeout: 30000,
7374
keepAlive: 369,
7475
maxRetriesPerRequest: 4,
75-
retryStrategy: (times) => Math.min(times * 30, 1000),
76+
retryStrategy: (times) => {
77+
const delay = Math.min(times * 30, 1000);
78+
logger.info(`Redis retry attempt ${times} with delay ${delay}ms`);
79+
return delay;
80+
},
7681
reconnectOnError: (error) => {
77-
// eslint-disable-next-line require-unicode-regexp
78-
const targetErrors = [/MOVED/, /READONLY/, /ETIMEDOUT/];
82+
const targetErrors = [
83+
/MOVED/,
84+
/READONLY/,
85+
/ETIMEDOUT/,
86+
/ECONNRESET/,
87+
/ECONNREFUSED/,
88+
/EPIPE/,
89+
/ENOTFOUND/,
90+
];
7991

8092
logger.error('Redis reconnect error:', error);
8193
return targetErrors.some((targetError) =>
@@ -105,9 +117,25 @@ export const buildRedisClient = (usePipelining: boolean = true) => {
105117
slotsRefreshTimeout: 5000,
106118
showFriendlyErrorStack: true,
107119
slotsRefreshInterval: 2000,
108-
clusterRetryStrategy: (times) => Math.min(times * 30, 1000),
120+
natMap: process.env.REDIS_NAT_MAP ? JSON.parse(process.env.REDIS_NAT_MAP) : undefined,
121+
redisOptions: {
122+
...redisOptions,
123+
// Queues commands when disconnected from Redis, executing them when connection is restored
124+
// This prevents data loss during network issues or cluster topology changes
125+
offlineQueue: true,
126+
// Default is 10000ms (10s). Increasing this allows more time to establish
127+
// connection during network instability while balancing real-time requirements
128+
connectTimeout: 10000,
129+
// Default is no timeout. Setting to 5000ms prevents hanging commands
130+
// while still allowing reasonable time for completion
131+
commandTimeout: 5000,
132+
},
133+
clusterRetryStrategy: (times) => {
134+
const delay = Math.min(times * 100, 3000);
135+
logger.info(`Redis Cluster retry attempt ${times} with delay ${delay}ms`);
136+
return delay;
137+
},
109138
enableAutoPipelining: usePipelining,
110-
redisOptions,
111139
};
112140

113141
logger.debug(
@@ -179,6 +207,11 @@ export const pubClient = getGlobalRedisClient();
179207
export const pubClientPool = genericPool.createPool(redisFactory, {
180208
max: 35,
181209
min: 15,
210+
acquireTimeoutMillis: 5000,
211+
idleTimeoutMillis: 30000,
212+
evictionRunIntervalMillis: 15000,
213+
numTestsPerEvictionRun: 3,
214+
softIdleTimeoutMillis: 10000,
182215
});
183216

184217
const app = express();
@@ -260,6 +293,28 @@ app.post('/debug', (req, _res, next) => {
260293
next(); // Pass control to the next handler (which will be /evt)
261294
});
262295

296+
// Add Redis key backward compatibility helper
297+
const getWithBackwardCompatibility = async ({
298+
newKey,
299+
oldKey,
300+
}: {
301+
newKey: string;
302+
oldKey: string;
303+
}) => {
304+
let value = await pubClient.get(newKey);
305+
if (!value) {
306+
// Try old key format if new key returns nothing
307+
value = await pubClient.get(oldKey);
308+
if (value) {
309+
// If found with old key, migrate to new format
310+
await pubClient.set(newKey, value, 'EX', config.channelExpiry.toString());
311+
incrementKeyMigration({ migrationType: 'channel-id' });
312+
logger.info(`Migrated key from ${oldKey} to ${newKey}`);
313+
}
314+
}
315+
return value;
316+
}
317+
263318
app.post('/evt', evtMetricsMiddleware, async (_req, res) => {
264319
try {
265320
const { body } = _req;
@@ -287,7 +342,7 @@ app.post('/evt', evtMetricsMiddleware, async (_req, res) => {
287342
];
288343

289344
// Filter: drop RPC events with unallowed methods silently, let all else through
290-
if (toCheckEvents.includes(body.event) &&
345+
if (toCheckEvents.includes(body.event) &&
291346
(!body.method || !allowedMethods.includes(body.method))) {
292347
return res.json({ success: true });
293348
}
@@ -316,7 +371,10 @@ app.post('/evt', evtMetricsMiddleware, async (_req, res) => {
316371

317372
let userIdHash = isAnonUser
318373
? crypto.createHash('sha1').update(channelId).digest('hex')
319-
: await pubClient.get(channelId);
374+
: await getWithBackwardCompatibility({
375+
newKey: `{${channelId}}:id`,
376+
oldKey: channelId,
377+
});
320378

321379
incrementRedisCacheOperation('analytics-get-channel-id', !!userIdHash);
322380

@@ -327,8 +385,9 @@ app.post('/evt', evtMetricsMiddleware, async (_req, res) => {
327385
);
328386

329387
if (!isExtensionEvent) {
388+
// Always write to the new format
330389
await pubClient.set(
331-
channelId,
390+
`{${channelId}}:id`,
332391
userIdHash,
333392
'EX',
334393
config.channelExpiry.toString(),
@@ -338,12 +397,16 @@ app.post('/evt', evtMetricsMiddleware, async (_req, res) => {
338397

339398
if (REDIS_DEBUG_LOGS) {
340399
await inspectRedis(channelId);
400+
await inspectRedis(`{${channelId}}:id`);
341401
}
342402

343403
let channelInfo: ChannelInfo | null;
344404
const cachedChannelInfo = isAnonUser
345405
? null
346-
: await pubClient.get(userIdHash);
406+
: await getWithBackwardCompatibility({
407+
newKey: `{${userIdHash}}:info`,
408+
oldKey: userIdHash,
409+
});
347410

348411
incrementRedisCacheOperation(
349412
'analytics-get-channel-info',
@@ -380,8 +443,9 @@ app.post('/evt', evtMetricsMiddleware, async (_req, res) => {
380443
);
381444

382445
if (!isExtensionEvent) {
446+
// Always write to the new format
383447
await pubClient.set(
384-
userIdHash,
448+
`{${userIdHash}}:info`,
385449
JSON.stringify(channelInfo),
386450
'EX',
387451
config.channelExpiry.toString(),
@@ -391,6 +455,7 @@ app.post('/evt', evtMetricsMiddleware, async (_req, res) => {
391455

392456
if (REDIS_DEBUG_LOGS) {
393457
await inspectRedis(userIdHash);
458+
await inspectRedis(`{${userIdHash}}:info`);
394459
}
395460

396461
const event = {
@@ -464,4 +529,51 @@ app.post('/evt', evtMetricsMiddleware, async (_req, res) => {
464529
}
465530
});
466531

532+
// Add Redis health checking and recovery
533+
let redisHealthCheckInterval: NodeJS.Timeout;
534+
let consecutiveRedisErrors = 0;
535+
const MAX_CONSECUTIVE_ERRORS = 10;
536+
537+
// Add Redis health monitoring function after getGlobalRedisClient
538+
export const monitorRedisHealth = () => {
539+
if (redisHealthCheckInterval) {
540+
clearInterval(redisHealthCheckInterval);
541+
}
542+
543+
redisHealthCheckInterval = setInterval(async () => {
544+
try {
545+
// Simple ping to check Redis health
546+
await pubClient.ping();
547+
// Reset error counter on successful ping
548+
if (consecutiveRedisErrors > 0) {
549+
logger.info(`Redis health restored after ${consecutiveRedisErrors} consecutive errors`);
550+
consecutiveRedisErrors = 0;
551+
}
552+
} catch (error) {
553+
consecutiveRedisErrors++;
554+
logger.error(`Redis health check failed (${consecutiveRedisErrors}/${MAX_CONSECUTIVE_ERRORS}):`, error);
555+
556+
// If too many consecutive errors, attempt to rebuild the Redis client
557+
if (consecutiveRedisErrors >= MAX_CONSECUTIVE_ERRORS) {
558+
logger.warn(`Rebuilding Redis client after ${consecutiveRedisErrors} consecutive errors`);
559+
try {
560+
// Disconnect the old client first
561+
pubClient.disconnect();
562+
// Build a new Redis client and update the global reference
563+
redisClient = buildRedisClient();
564+
// Update the pubClient reference
565+
Object.assign(pubClient, redisClient);
566+
consecutiveRedisErrors = 0;
567+
logger.info('Redis client rebuilt successfully');
568+
} catch (rebuildError) {
569+
logger.error('Failed to rebuild Redis client:', rebuildError);
570+
}
571+
}
572+
}
573+
}, 30000); // Check every 30 seconds
574+
};
575+
576+
// Start monitoring when the module is loaded
577+
monitorRedisHealth();
578+
467579
export { analytics, app };

0 commit comments

Comments
 (0)