Skip to content

Commit ad26cde

Browse files
authored
fix(v3): prevent saturated queues from dominating dequeue attempts by adding a cooloff period of successive failed dequeues (#2439)
* fix(v3): prevent saturated queues from dominating dequeue attempts by adding a cooloff period of successive failed dequeues * Add env vars and add a marqs shutdown on SIGTERM/INT
1 parent 5e4756f commit ad26cde

File tree

2 files changed

+81
-15
lines changed

2 files changed

+81
-15
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,8 @@ const EnvironmentSchema = z.object({
421421
MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
422422
MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
423423
MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
424+
MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10),
425+
MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(5_000),
424426

425427
PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
426428

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 79 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { type RedisOptions } from "@internal/redis";
12
import {
23
context,
34
propagation,
@@ -8,21 +9,32 @@ import {
89
trace,
910
Tracer,
1011
} from "@opentelemetry/api";
11-
import { type RedisOptions } from "@internal/redis";
1212
import {
1313
SEMATTRS_MESSAGE_ID,
14-
SEMATTRS_MESSAGING_SYSTEM,
1514
SEMATTRS_MESSAGING_OPERATION,
15+
SEMATTRS_MESSAGING_SYSTEM,
1616
} from "@opentelemetry/semantic-conventions";
17+
import { Logger } from "@trigger.dev/core/logger";
18+
import { tryCatch } from "@trigger.dev/core/utils";
1719
import { flattenAttributes } from "@trigger.dev/core/v3";
20+
import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker";
1821
import Redis, { type Callback, type Result } from "ioredis";
22+
import { setInterval as setIntervalAsync } from "node:timers/promises";
23+
import z from "zod";
1924
import { env } from "~/env.server";
2025
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2126
import { logger } from "~/services/logger.server";
2227
import { singleton } from "~/utils/singleton";
28+
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
2329
import { concurrencyTracker } from "../services/taskRunConcurrencyTracker.server";
2430
import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
2531
import { AsyncWorker } from "./asyncWorker.server";
32+
import {
33+
MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS,
34+
MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET,
35+
MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET,
36+
MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS,
37+
} from "./constants.server";
2638
import { FairDequeuingStrategy } from "./fairDequeuingStrategy.server";
2739
import { MarQSShortKeyProducer } from "./marqsKeyProducer";
2840
import {
@@ -36,18 +48,6 @@ import {
3648
VisibilityTimeoutStrategy,
3749
} from "./types";
3850
import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server";
39-
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
40-
import {
41-
MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS,
42-
MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET,
43-
MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET,
44-
MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS,
45-
} from "./constants.server";
46-
import { setInterval } from "node:timers/promises";
47-
import { tryCatch } from "@trigger.dev/core/utils";
48-
import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker";
49-
import z from "zod";
50-
import { Logger } from "@trigger.dev/core/logger";
5151

5252
const KEY_PREFIX = "marqs:";
5353

@@ -78,6 +78,8 @@ export type MarQSOptions = {
7878
subscriber?: MessageQueueSubscriber;
7979
sharedWorkerQueueConsumerIntervalMs?: number;
8080
sharedWorkerQueueMaxMessageCount?: number;
81+
sharedWorkerQueueCooloffPeriodMs?: number;
82+
sharedWorkerQueueCooloffCountThreshold?: number;
8183
eagerDequeuingEnabled?: boolean;
8284
workerOptions: {
8385
pollIntervalMs?: number;
@@ -107,6 +109,9 @@ export class MarQS {
107109
public keys: MarQSKeyProducer;
108110
#rebalanceWorkers: Array<AsyncWorker> = [];
109111
private worker: Worker<typeof workerCatalog>;
112+
private queueDequeueCooloffPeriod: Map<string, number> = new Map();
113+
private queueDequeueCooloffCounts: Map<string, number> = new Map();
114+
private clearCooloffPeriodInterval: NodeJS.Timeout;
110115

111116
constructor(private readonly options: MarQSOptions) {
112117
this.redis = options.redis;
@@ -116,6 +121,12 @@ export class MarQS {
116121
this.#startRebalanceWorkers();
117122
this.#registerCommands();
118123

124+
// This will prevent these cooloff maps from growing indefinitely
125+
this.clearCooloffPeriodInterval = setInterval(() => {
126+
this.queueDequeueCooloffCounts.clear();
127+
this.queueDequeueCooloffPeriod.clear();
128+
}, 60_000 * 10); // 10 minutes
129+
119130
this.worker = new Worker({
120131
name: "marqs-worker",
121132
redisOptions: options.workerOptions.redisOptions,
@@ -135,6 +146,19 @@ export class MarQS {
135146
if (options.workerOptions?.enabled) {
136147
this.worker.start();
137148
}
149+
150+
this.#setupShutdownHandlers();
151+
}
152+
153+
#setupShutdownHandlers() {
154+
process.on("SIGTERM", () => this.shutdown("SIGTERM"));
155+
process.on("SIGINT", () => this.shutdown("SIGINT"));
156+
}
157+
158+
async shutdown(signal: NodeJS.Signals) {
159+
console.log("👇 Shutting down marqs", this.name, signal);
160+
clearInterval(this.clearCooloffPeriodInterval);
161+
this.#rebalanceWorkers.forEach((worker) => worker.stop());
138162
}
139163

140164
get name() {
@@ -737,7 +761,7 @@ export class MarQS {
737761
let processedCount = 0;
738762

739763
try {
740-
for await (const _ of setInterval(
764+
for await (const _ of setIntervalAsync(
741765
this.options.sharedWorkerQueueConsumerIntervalMs ?? 500,
742766
null,
743767
{
@@ -821,6 +845,7 @@ export class MarQS {
821845
let attemptedEnvs = 0;
822846
let attemptedQueues = 0;
823847
let messageCount = 0;
848+
let coolOffPeriodCount = 0;
824849

825850
// Try each queue in order, attempt to dequeue a message from each queue, keep going until we've tried all the queues
826851
for (const env of envQueues) {
@@ -829,6 +854,20 @@ export class MarQS {
829854
for (const messageQueue of env.queues) {
830855
attemptedQueues++;
831856

857+
const cooloffPeriod = this.queueDequeueCooloffPeriod.get(messageQueue);
858+
859+
// If the queue is in a cooloff period, skip attempting to dequeue from it
860+
if (cooloffPeriod) {
861+
// If the cooloff period is still active, skip attempting to dequeue from it
862+
if (cooloffPeriod > Date.now()) {
863+
coolOffPeriodCount++;
864+
continue;
865+
} else {
866+
// If the cooloff period is over, delete the cooloff period and attempt to dequeue from the queue
867+
this.queueDequeueCooloffPeriod.delete(messageQueue);
868+
}
869+
}
870+
832871
await this.#trace(
833872
"attemptDequeue",
834873
async (attemptDequeueSpan) => {
@@ -862,10 +901,32 @@ export class MarQS {
862901
);
863902

864903
if (!messages || messages.length === 0) {
904+
const cooloffCount = this.queueDequeueCooloffCounts.get(messageQueue) ?? 0;
905+
906+
const cooloffCountThreshold = Math.max(
907+
10,
908+
this.options.sharedWorkerQueueCooloffCountThreshold ?? 10
909+
); // minimum of 10
910+
911+
if (cooloffCount >= cooloffCountThreshold) {
912+
// If no messages were dequeued, set a cooloff period for the queue
913+
// This is to prevent the queue from being dequeued too frequently
914+
// and to give other queues a chance to dequeue messages more frequently
915+
this.queueDequeueCooloffPeriod.set(
916+
messageQueue,
917+
Date.now() + (this.options.sharedWorkerQueueCooloffPeriodMs ?? 10_000) // defaults to 10 seconds
918+
);
919+
this.queueDequeueCooloffCounts.delete(messageQueue);
920+
} else {
921+
this.queueDequeueCooloffCounts.set(messageQueue, cooloffCount + 1);
922+
}
923+
865924
attemptDequeueSpan.setAttribute("message_count", 0);
866925
return null; // Try next queue if no message was dequeued
867926
}
868927

928+
this.queueDequeueCooloffCounts.delete(messageQueue);
929+
869930
messageCount += messages.length;
870931

871932
attemptDequeueSpan.setAttribute("message_count", messages.length);
@@ -916,6 +977,7 @@ export class MarQS {
916977
span.setAttribute("attempted_queues", attemptedQueues);
917978
span.setAttribute("attempted_envs", attemptedEnvs);
918979
span.setAttribute("message_count", messageCount);
980+
span.setAttribute("cooloff_period_count", coolOffPeriodCount);
919981

920982
return;
921983
},
@@ -2614,6 +2676,8 @@ function getMarQSClient() {
26142676
sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS,
26152677
sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT,
26162678
eagerDequeuingEnabled: env.MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1",
2679+
sharedWorkerQueueCooloffCountThreshold: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD,
2680+
sharedWorkerQueueCooloffPeriodMs: env.MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS,
26172681
workerOptions: {
26182682
enabled: env.MARQS_WORKER_ENABLED === "1",
26192683
pollIntervalMs: env.MARQS_WORKER_POLL_INTERVAL_MS,

0 commit comments

Comments
 (0)