From 93efac647a677653e8df01766e1ede71c0bf98ca Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 30 Mar 2026 10:58:37 -0700 Subject: [PATCH 01/10] improvement(queue): remove indirection for flag --- apps/sim/app/api/schedules/execute/route.ts | 4 ++-- apps/sim/app/api/workflows/[id]/execute/route.ts | 12 ++++++------ apps/sim/lib/core/async-jobs/config.ts | 12 ------------ apps/sim/lib/core/async-jobs/index.ts | 2 -- apps/sim/lib/webhooks/processor.ts | 4 ++-- 5 files changed, 10 insertions(+), 24 deletions(-) diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index d739f3aa67b..6c13d91f121 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -4,7 +4,7 @@ import { and, eq, isNull, lt, lte, ne, not, or, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' import { verifyCronAuth } from '@/lib/auth/internal' -import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs' +import { getJobQueue } from '@/lib/core/async-jobs' import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq' import { generateRequestId } from '@/lib/core/utils/request' import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch' @@ -153,7 +153,7 @@ export async function GET(request: NextRequest) { `[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}` ) - if (shouldExecuteInline()) { + if (!isBullMQEnabled()) { try { await jobQueue.startJob(jobId) const output = await executeScheduleJob(payload) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index df3fc41d434..d2bc6c24978 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -4,8 +4,8 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid' import { z } from 'zod' import { AuthType, checkHybridAuth, hasExternalApiCredentials } from '@/lib/auth/hybrid' import { admissionRejectedResponse, tryAdmit } from '@/lib/core/admission/gate' -import { getJobQueue, shouldExecuteInline, shouldUseBullMQ } from '@/lib/core/async-jobs' -import { createBullMQJobData } from '@/lib/core/bullmq' +import { getJobQueue } from '@/lib/core/async-jobs' +import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq' import { createTimeoutAbortController, getTimeoutErrorMessage, @@ -209,7 +209,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise { try { @@ -792,7 +792,7 @@ async function handleExecutePost( const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {} - if (shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)) { + if (isBullMQEnabled() && !INLINE_TRIGGER_TYPES.has(triggerType)) { try { const dispatchJobId = await enqueueDirectWorkflowExecution( { @@ -992,7 +992,7 @@ async function handleExecutePost( } if (shouldUseDraftState) { - const shouldDispatchViaQueue = shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType) + const shouldDispatchViaQueue = isBullMQEnabled() && !INLINE_TRIGGER_TYPES.has(triggerType) if (shouldDispatchViaQueue) { const metadata: ExecutionMetadata = { requestId, diff --git a/apps/sim/lib/core/async-jobs/config.ts b/apps/sim/lib/core/async-jobs/config.ts index c4f0a4dcf83..4c8c51f0d19 100644 --- a/apps/sim/lib/core/async-jobs/config.ts +++ b/apps/sim/lib/core/async-jobs/config.ts @@ -95,18 +95,6 @@ export async function getInlineJobQueue(): Promise { return cachedInlineBackend } -/** - * Checks if jobs should be executed inline in-process. - * Database fallback is the only mode that still relies on inline execution. - */ -export function shouldExecuteInline(): boolean { - return getAsyncBackendType() === 'database' -} - -export function shouldUseBullMQ(): boolean { - return isBullMQEnabled() -} - /** * Resets the cached backend (useful for testing) */ diff --git a/apps/sim/lib/core/async-jobs/index.ts b/apps/sim/lib/core/async-jobs/index.ts index 76ec7072207..0bd43ccdd09 100644 --- a/apps/sim/lib/core/async-jobs/index.ts +++ b/apps/sim/lib/core/async-jobs/index.ts @@ -4,8 +4,6 @@ export { getInlineJobQueue, getJobQueue, resetJobQueueCache, - shouldExecuteInline, - shouldUseBullMQ, } from './config' export type { AsyncBackendType, diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index 3ded5d96a48..70732b12304 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -5,7 +5,7 @@ import { and, eq, isNull, or } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing/core/subscription' -import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs' +import { getInlineJobQueue, getJobQueue } from '@/lib/core/async-jobs' import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types' import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq' import { isProd } from '@/lib/core/config/feature-flags' @@ -1265,7 +1265,7 @@ export async function queueWebhookExecution( const isPolling = isPollingWebhookProvider(payload.provider) - if (isPolling && !shouldExecuteInline()) { + if (isPolling && isBullMQEnabled()) { const jobId = isBullMQEnabled() ? await enqueueWorkspaceDispatch({ id: executionId, From 6cc1233055553584dd1acd9ad207b1f6892471fe Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 30 Mar 2026 13:06:20 -0700 Subject: [PATCH 02/10] improvement(lua): return job id instedad of serializing within lua --- apps/sim/executor/execution/snapshot.ts | 2 +- apps/sim/lib/core/bullmq/connection.ts | 4 +- apps/sim/lib/core/config/env.ts | 1 - .../core/workspace-dispatch/redis-store.ts | 45 ++++++++++++------- .../executor/queued-workflow-execution.ts | 2 +- 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/apps/sim/executor/execution/snapshot.ts b/apps/sim/executor/execution/snapshot.ts index afe9bf52d7f..73f2f7275e1 100644 --- a/apps/sim/executor/execution/snapshot.ts +++ b/apps/sim/executor/execution/snapshot.ts @@ -28,7 +28,7 @@ export class ExecutionSnapshot { data.workflow, data.input, data.workflowVariables, - data.selectedOutputs, + Array.isArray(data.selectedOutputs) ? data.selectedOutputs : [], data.state ) } diff --git a/apps/sim/lib/core/bullmq/connection.ts b/apps/sim/lib/core/bullmq/connection.ts index e888f5772a7..80def9d5cb5 100644 --- a/apps/sim/lib/core/bullmq/connection.ts +++ b/apps/sim/lib/core/bullmq/connection.ts @@ -1,8 +1,8 @@ import type { ConnectionOptions } from 'bullmq' -import { env, isTruthy } from '@/lib/core/config/env' +import { env } from '@/lib/core/config/env' export function isBullMQEnabled(): boolean { - return isTruthy(env.CONCURRENCY_CONTROL_ENABLED) && Boolean(env.REDIS_URL) + return Boolean(env.REDIS_URL) } export function getBullMQConnectionOptions(): ConnectionOptions { diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 5a1dc5743d1..be295643075 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -186,7 +186,6 @@ export const env = createEnv({ FREE_PLAN_LOG_RETENTION_DAYS: z.string().optional(), // Log retention days for free plan users // Admission & Burst Protection - CONCURRENCY_CONTROL_ENABLED: z.string().optional().default('false'), // Set to 'true' to enable BullMQ-based concurrency control (default: inline execution) ADMISSION_GATE_MAX_INFLIGHT: z.string().optional().default('500'), // Max concurrent in-flight execution requests per pod DISPATCH_MAX_QUEUE_PER_WORKSPACE: z.string().optional().default('1000'), // Max queued dispatch jobs per workspace DISPATCH_MAX_QUEUE_GLOBAL: z.string().optional().default('50000'), // Max queued dispatch jobs globally diff --git a/apps/sim/lib/core/workspace-dispatch/redis-store.ts b/apps/sim/lib/core/workspace-dispatch/redis-store.ts index 8fbf8dfee4f..9d296907b60 100644 --- a/apps/sim/lib/core/workspace-dispatch/redis-store.ts +++ b/apps/sim/lib/core/workspace-dispatch/redis-store.ts @@ -121,17 +121,6 @@ if selectedRecord == nil then end redis.call('ZADD', leaseKey(), leaseExpiresAt, leaseId) -selectedRecord.status = 'admitting' -selectedRecord.lease = { - workspaceId = workspaceId, - leaseId = leaseId -} -if selectedRecord.metadata == nil then - selectedRecord.metadata = {} -end -selectedRecord.metadata.dispatchLeaseExpiresAt = leaseExpiresAt - -redis.call('SET', jobPrefix .. selectedId, cjson.encode(selectedRecord), 'EX', jobTtlSeconds) redis.call('ZREM', laneKey(selectedLane), selectedId) local hasPending, minReadyAt = workspaceHasPending() @@ -146,7 +135,7 @@ end return cjson.encode({ type = 'admitted', - record = selectedRecord, + jobId = selectedId, leaseId = leaseId, leaseExpiresAt = leaseExpiresAt }) @@ -325,13 +314,37 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd String(JOB_TTL_SECONDS) ) - const parsed = JSON.parse(String(raw)) as WorkspaceDispatchClaimResult + const parsed = JSON.parse(String(raw)) switch (parsed.type) { - case WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED: - case WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED: case WORKSPACE_DISPATCH_CLAIM_RESULTS.LIMIT_REACHED: case WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY: - return parsed + return parsed as WorkspaceDispatchClaimResult + case WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED: + return parsed as WorkspaceDispatchClaimResult + case WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED: { + const record = await this.getDispatchJobRecord(parsed.jobId) + if (!record) { + throw new Error(`Claimed job ${parsed.jobId} not found in store`) + } + + const updatedRecord: WorkspaceDispatchJobRecord = { + ...record, + status: 'admitting', + lease: { workspaceId, leaseId: parsed.leaseId }, + metadata: { + ...record.metadata, + dispatchLeaseExpiresAt: parsed.leaseExpiresAt, + }, + } + await this.saveDispatchJob(updatedRecord) + + return { + type: WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED, + record: updatedRecord, + leaseId: parsed.leaseId, + leaseExpiresAt: parsed.leaseExpiresAt, + } + } default: throw new Error( `Unknown dispatch claim result: ${String((parsed as { type?: string }).type)}` diff --git a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts index c60ba860a11..ae1f8e4e105 100644 --- a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts +++ b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts @@ -108,7 +108,7 @@ export async function executeQueuedWorkflowJob( payload.workflow, payload.input, payload.variables, - payload.selectedOutputs ?? [] + Array.isArray(payload.selectedOutputs) ? payload.selectedOutputs : [] ) let callbacks = {} From 584d591143c010b6ec6793efe2a01f7d44e7c119 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 30 Mar 2026 13:21:02 -0700 Subject: [PATCH 03/10] memory improvements --- apps/sim/lib/core/bullmq/queues.ts | 28 +++++++++---------- .../core/workspace-dispatch/memory-store.ts | 2 ++ .../core/workspace-dispatch/redis-store.ts | 4 ++- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/apps/sim/lib/core/bullmq/queues.ts b/apps/sim/lib/core/bullmq/queues.ts index 2278a309d95..0545d72f229 100644 --- a/apps/sim/lib/core/bullmq/queues.ts +++ b/apps/sim/lib/core/bullmq/queues.ts @@ -28,22 +28,22 @@ function getQueueDefaultOptions(type: JobType) { return { attempts: 3, backoff: { type: 'exponential' as const, delay: 1000 }, - removeOnComplete: { age: 24 * 60 * 60 }, - removeOnFail: { age: 7 * 24 * 60 * 60 }, + removeOnComplete: { age: 2 * 60 * 60, count: 1000 }, + removeOnFail: { age: 2 * 60 * 60, count: 500 }, } case 'webhook-execution': return { attempts: 2, backoff: { type: 'exponential' as const, delay: 2000 }, - removeOnComplete: { age: 24 * 60 * 60 }, - removeOnFail: { age: 3 * 24 * 60 * 60 }, + removeOnComplete: { age: 2 * 60 * 60, count: 1000 }, + removeOnFail: { age: 2 * 60 * 60, count: 500 }, } case 'schedule-execution': return { attempts: 2, backoff: { type: 'exponential' as const, delay: 5000 }, - removeOnComplete: { age: 24 * 60 * 60 }, - removeOnFail: { age: 3 * 24 * 60 * 60 }, + removeOnComplete: { age: 2 * 60 * 60, count: 1000 }, + removeOnFail: { age: 2 * 60 * 60, count: 500 }, } } } @@ -69,8 +69,8 @@ function createNamedQueue( defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 5000 }, - removeOnComplete: { age: 24 * 60 * 60 }, - removeOnFail: { age: 7 * 24 * 60 * 60 }, + removeOnComplete: { age: 2 * 60 * 60, count: 500 }, + removeOnFail: { age: 2 * 60 * 60, count: 200 }, }, }) case KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE: @@ -79,8 +79,8 @@ function createNamedQueue( defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, - removeOnComplete: { age: 24 * 60 * 60 }, - removeOnFail: { age: 7 * 24 * 60 * 60 }, + removeOnComplete: { age: 2 * 60 * 60, count: 500 }, + removeOnFail: { age: 2 * 60 * 60, count: 200 }, }, }) case MOTHERSHIP_JOB_EXECUTION_QUEUE: @@ -88,8 +88,8 @@ function createNamedQueue( connection: getBullMQConnectionOptions(), defaultJobOptions: { attempts: 1, - removeOnComplete: { age: 24 * 60 * 60 }, - removeOnFail: { age: 7 * 24 * 60 * 60 }, + removeOnComplete: { age: 2 * 60 * 60, count: 500 }, + removeOnFail: { age: 2 * 60 * 60, count: 200 }, }, }) case WORKSPACE_NOTIFICATION_DELIVERY_QUEUE: @@ -97,8 +97,8 @@ function createNamedQueue( connection: getBullMQConnectionOptions(), defaultJobOptions: { attempts: 1, - removeOnComplete: { age: 24 * 60 * 60 }, - removeOnFail: { age: 7 * 24 * 60 * 60 }, + removeOnComplete: { age: 2 * 60 * 60, count: 500 }, + removeOnFail: { age: 2 * 60 * 60, count: 200 }, }, }) } diff --git a/apps/sim/lib/core/workspace-dispatch/memory-store.ts b/apps/sim/lib/core/workspace-dispatch/memory-store.ts index 1c874d091be..f4e40ce88ea 100644 --- a/apps/sim/lib/core/workspace-dispatch/memory-store.ts +++ b/apps/sim/lib/core/workspace-dispatch/memory-store.ts @@ -473,6 +473,7 @@ export class MemoryWorkspaceDispatchStorage implements WorkspaceDispatchStorageA status: 'completed', completedAt: Date.now(), output, + bullmqPayload: undefined, })) } @@ -482,6 +483,7 @@ export class MemoryWorkspaceDispatchStorage implements WorkspaceDispatchStorageA status: 'failed', completedAt: Date.now(), error, + bullmqPayload: undefined, })) } diff --git a/apps/sim/lib/core/workspace-dispatch/redis-store.ts b/apps/sim/lib/core/workspace-dispatch/redis-store.ts index 9d296907b60..f6e5963ba21 100644 --- a/apps/sim/lib/core/workspace-dispatch/redis-store.ts +++ b/apps/sim/lib/core/workspace-dispatch/redis-store.ts @@ -12,7 +12,7 @@ import { const logger = createLogger('WorkspaceDispatchRedisStore') const DISPATCH_PREFIX = 'workspace-dispatch:v1' -const JOB_TTL_SECONDS = 48 * 60 * 60 +const JOB_TTL_SECONDS = 2 * 60 * 60 const SEQUENCE_KEY = `${DISPATCH_PREFIX}:sequence` const ACTIVE_WORKSPACES_KEY = `${DISPATCH_PREFIX}:workspaces` const GLOBAL_DEPTH_KEY = `${DISPATCH_PREFIX}:global-depth` @@ -549,6 +549,7 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd status: 'completed', completedAt: Date.now(), output, + bullmqPayload: undefined, })) await this.redis.decr(GLOBAL_DEPTH_KEY).catch(() => undefined) } @@ -559,6 +560,7 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd status: 'failed', completedAt: Date.now(), error, + bullmqPayload: undefined, })) await this.redis.decr(GLOBAL_DEPTH_KEY).catch(() => undefined) } From c4987da0996e511a1b103accdeb161f70f9084ea Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 30 Mar 2026 13:43:19 -0700 Subject: [PATCH 04/10] cleanup code --- .../core/workspace-dispatch/redis-store.ts | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/apps/sim/lib/core/workspace-dispatch/redis-store.ts b/apps/sim/lib/core/workspace-dispatch/redis-store.ts index f6e5963ba21..f0498b63cb3 100644 --- a/apps/sim/lib/core/workspace-dispatch/redis-store.ts +++ b/apps/sim/lib/core/workspace-dispatch/redis-store.ts @@ -314,26 +314,38 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd String(JOB_TTL_SECONDS) ) - const parsed = JSON.parse(String(raw)) - switch (parsed.type) { + interface LuaClaimResponse { + type: string + jobId?: string + leaseId?: string + leaseExpiresAt?: number + nextReadyAt?: number + } + + const lua: LuaClaimResponse = JSON.parse(String(raw)) + switch (lua.type) { case WORKSPACE_DISPATCH_CLAIM_RESULTS.LIMIT_REACHED: + return { type: WORKSPACE_DISPATCH_CLAIM_RESULTS.LIMIT_REACHED } case WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY: - return parsed as WorkspaceDispatchClaimResult + return { type: WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY } case WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED: - return parsed as WorkspaceDispatchClaimResult + return { + type: WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED, + nextReadyAt: lua.nextReadyAt ?? Date.now(), + } case WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED: { - const record = await this.getDispatchJobRecord(parsed.jobId) + const record = await this.getDispatchJobRecord(lua.jobId!) if (!record) { - throw new Error(`Claimed job ${parsed.jobId} not found in store`) + throw new Error(`Claimed job ${lua.jobId} not found in store`) } const updatedRecord: WorkspaceDispatchJobRecord = { ...record, status: 'admitting', - lease: { workspaceId, leaseId: parsed.leaseId }, + lease: { workspaceId, leaseId: lua.leaseId! }, metadata: { ...record.metadata, - dispatchLeaseExpiresAt: parsed.leaseExpiresAt, + dispatchLeaseExpiresAt: lua.leaseExpiresAt!, }, } await this.saveDispatchJob(updatedRecord) @@ -341,14 +353,12 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd return { type: WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED, record: updatedRecord, - leaseId: parsed.leaseId, - leaseExpiresAt: parsed.leaseExpiresAt, + leaseId: lua.leaseId!, + leaseExpiresAt: lua.leaseExpiresAt!, } } default: - throw new Error( - `Unknown dispatch claim result: ${String((parsed as { type?: string }).type)}` - ) + throw new Error(`Unknown dispatch claim result: ${String(lua.type)}`) } } From 6b9a1bc7e72f23568d9a1609817748f1a33350c9 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 30 Mar 2026 13:52:17 -0700 Subject: [PATCH 05/10] fix types --- .../workflows/[id]/execute/route.async.test.ts | 15 ++++++++++++--- bun.lock | 1 - 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts b/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts index 355ae6ddf06..52c71151245 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts @@ -46,15 +46,24 @@ vi.mock('@/lib/core/async-jobs', () => ({ completeJob: vi.fn(), markJobFailed: vi.fn(), }), - shouldExecuteInline: vi.fn().mockReturnValue(false), - shouldUseBullMQ: vi.fn().mockReturnValue(true), })) vi.mock('@/lib/core/bullmq', () => ({ - createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({ payload, metadata })), + createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({ + payload, + metadata: metadata ?? {}, + })), + isBullMQEnabled: vi.fn().mockReturnValue(true), })) vi.mock('@/lib/core/workspace-dispatch', () => ({ + DispatchQueueFullError: class DispatchQueueFullError extends Error { + statusCode = 503 + constructor(scope: string, depth: number, limit: number) { + super(`${scope} queue at capacity (${depth}/${limit})`) + this.name = 'DispatchQueueFullError' + } + }, enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch, waitForDispatchJob: vi.fn(), })) diff --git a/bun.lock b/bun.lock index 65639871af6..fe8b9af4a63 100644 --- a/bun.lock +++ b/bun.lock @@ -1,6 +1,5 @@ { "lockfileVersion": 1, - "configVersion": 0, "workspaces": { "": { "name": "simstudio", From d97f27ec32ace3d530df3a2df94fb5998c58ab18 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 30 Mar 2026 14:08:28 -0700 Subject: [PATCH 06/10] fix trigger dev, bullmq, inline prioritization --- apps/sim/app/api/schedules/execute/route.ts | 3 ++- .../app/api/workflows/[id]/execute/route.ts | 3 ++- apps/sim/lib/webhooks/processor.ts | 22 +++++++++---------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index 6c13d91f121..5ab2aaa7863 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -6,6 +6,7 @@ import { v4 as uuidv4 } from 'uuid' import { verifyCronAuth } from '@/lib/auth/internal' import { getJobQueue } from '@/lib/core/async-jobs' import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq' +import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags' import { generateRequestId } from '@/lib/core/utils/request' import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch' import { @@ -153,7 +154,7 @@ export async function GET(request: NextRequest) { `[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}` ) - if (!isBullMQEnabled()) { + if (!isBullMQEnabled() && !isTriggerDevEnabled) { try { await jobQueue.startJob(jobId) const output = await executeScheduleJob(payload) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index d2bc6c24978..ce3c342d263 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -6,6 +6,7 @@ import { AuthType, checkHybridAuth, hasExternalApiCredentials } from '@/lib/auth import { admissionRejectedResponse, tryAdmit } from '@/lib/core/admission/gate' import { getJobQueue } from '@/lib/core/async-jobs' import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq' +import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags' import { createTimeoutAbortController, getTimeoutErrorMessage, @@ -238,7 +239,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise { try { diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index 70732b12304..be11209ee86 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -8,7 +8,7 @@ import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing/core/subscri import { getInlineJobQueue, getJobQueue } from '@/lib/core/async-jobs' import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types' import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq' -import { isProd } from '@/lib/core/config/feature-flags' +import { isProd, isTriggerDevEnabled } from '@/lib/core/config/feature-flags' import { safeCompare } from '@/lib/core/security/encryption' import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch' import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' @@ -1265,9 +1265,16 @@ export async function queueWebhookExecution( const isPolling = isPollingWebhookProvider(payload.provider) - if (isPolling && isBullMQEnabled()) { - const jobId = isBullMQEnabled() - ? await enqueueWorkspaceDispatch({ + if (isPolling && (isTriggerDevEnabled || isBullMQEnabled())) { + const jobId = isTriggerDevEnabled + ? await (await getJobQueue()).enqueue('webhook-execution', payload, { + metadata: { + workflowId: foundWorkflow.id, + userId: actorUserId, + correlation, + }, + }) + : await enqueueWorkspaceDispatch({ id: executionId, workspaceId: foundWorkflow.workspaceId, lane: 'runtime', @@ -1284,13 +1291,6 @@ export async function queueWebhookExecution( correlation, }, }) - : await (await getJobQueue()).enqueue('webhook-execution', payload, { - metadata: { - workflowId: foundWorkflow.id, - userId: actorUserId, - correlation, - }, - }) logger.info( `[${options.requestId}] Queued polling webhook execution task ${jobId} for ${foundWebhook.provider} webhook via job queue` ) From 72d8a68c12e5b3fcdd68f1ad644bb0cf996f283d Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 30 Mar 2026 14:11:28 -0700 Subject: [PATCH 07/10] address comments --- apps/sim/lib/core/workspace-dispatch/redis-store.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/core/workspace-dispatch/redis-store.ts b/apps/sim/lib/core/workspace-dispatch/redis-store.ts index f0498b63cb3..d2f66f9d609 100644 --- a/apps/sim/lib/core/workspace-dispatch/redis-store.ts +++ b/apps/sim/lib/core/workspace-dispatch/redis-store.ts @@ -336,7 +336,11 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd case WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED: { const record = await this.getDispatchJobRecord(lua.jobId!) if (!record) { - throw new Error(`Claimed job ${lua.jobId} not found in store`) + await this.redis.zrem(workspaceLeaseKey(workspaceId), lua.leaseId!).catch(() => undefined) + logger.warn('Claimed job record expired before status update, lease released', { + jobId: lua.jobId, + }) + return { type: WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY } } const updatedRecord: WorkspaceDispatchJobRecord = { From f52ab39ac6bf2c1fe5a5988597b72db3bf1f8473 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 30 Mar 2026 14:17:06 -0700 Subject: [PATCH 08/10] fix tests --- apps/sim/lib/webhooks/processor.test.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/apps/sim/lib/webhooks/processor.test.ts b/apps/sim/lib/webhooks/processor.test.ts index 86876fda02b..a56ecda20af 100644 --- a/apps/sim/lib/webhooks/processor.test.ts +++ b/apps/sim/lib/webhooks/processor.test.ts @@ -11,14 +11,12 @@ const { mockEnqueue, mockEnqueueWorkspaceDispatch, mockGetJobQueue, - mockShouldExecuteInline, } = vi.hoisted(() => ({ mockUuidV4: vi.fn(), mockPreprocessExecution: vi.fn(), mockEnqueue: vi.fn(), mockEnqueueWorkspaceDispatch: vi.fn(), mockGetJobQueue: vi.fn(), - mockShouldExecuteInline: vi.fn(), })) vi.mock('@sim/db', () => ({ @@ -61,7 +59,6 @@ vi.mock('@/lib/billing/subscriptions/utils', () => ({ vi.mock('@/lib/core/async-jobs', () => ({ getInlineJobQueue: vi.fn(), getJobQueue: mockGetJobQueue, - shouldExecuteInline: mockShouldExecuteInline, })) vi.mock('@/lib/core/bullmq', () => ({ @@ -75,6 +72,7 @@ vi.mock('@/lib/core/workspace-dispatch', () => ({ vi.mock('@/lib/core/config/feature-flags', () => ({ isProd: false, + isTriggerDevEnabled: false, })) vi.mock('@/lib/core/security/encryption', () => ({ @@ -155,7 +153,6 @@ describe('webhook processor execution identity', () => { mockEnqueue.mockResolvedValue('job-1') mockEnqueueWorkspaceDispatch.mockResolvedValue('job-1') mockGetJobQueue.mockResolvedValue({ enqueue: mockEnqueue }) - mockShouldExecuteInline.mockReturnValue(false) mockUuidV4.mockReturnValue('generated-execution-id') }) From d34d6f4cee5f711bba53f968419f818f44c0858b Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Mon, 30 Mar 2026 14:42:06 -0700 Subject: [PATCH 09/10] remove unused lua code --- apps/sim/lib/core/workspace-dispatch/redis-store.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/apps/sim/lib/core/workspace-dispatch/redis-store.ts b/apps/sim/lib/core/workspace-dispatch/redis-store.ts index d2f66f9d609..dd305f20ae6 100644 --- a/apps/sim/lib/core/workspace-dispatch/redis-store.ts +++ b/apps/sim/lib/core/workspace-dispatch/redis-store.ts @@ -27,7 +27,6 @@ local sequenceKey = ARGV[7] local activeWorkspacesKey = ARGV[8] local jobPrefix = ARGV[9] local workspacePrefix = ARGV[10] -local jobTtlSeconds = tonumber(ARGV[11]) local function laneKey(lane) return workspacePrefix .. workspaceId .. ':lane:' .. lane @@ -310,8 +309,7 @@ export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAd SEQUENCE_KEY, ACTIVE_WORKSPACES_KEY, `${DISPATCH_PREFIX}:job:`, - `${DISPATCH_PREFIX}:workspace:`, - String(JOB_TTL_SECONDS) + `${DISPATCH_PREFIX}:workspace:` ) interface LuaClaimResponse { From 07716cdb10c75048236aca66cc6534a65769645e Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 1 Apr 2026 18:14:51 -0700 Subject: [PATCH 10/10] fix(cjson): santization bullmq payloads accurately --- apps/sim/background/workflow-execution.ts | 3 +- apps/sim/executor/execution/block-executor.ts | 2 +- apps/sim/executor/execution/executor.ts | 4 +- .../executor/execution/snapshot-serializer.ts | 2 +- apps/sim/lib/core/utils/json-sanitize.ts | 68 +++++++++++++++++++ .../workflows/executor/execute-workflow.ts | 2 +- .../executor/queued-workflow-execution.ts | 5 +- 7 files changed, 80 insertions(+), 6 deletions(-) create mode 100644 apps/sim/lib/core/utils/json-sanitize.ts diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index aa2411a2580..438fd5ad153 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -3,6 +3,7 @@ import { task } from '@trigger.dev/sdk' import { v4 as uuidv4 } from 'uuid' import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types' import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' +import { ensureArray } from '@/lib/core/utils/json-sanitize' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -110,7 +111,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { useDraftState: false, startTime: new Date().toISOString(), isClientSession: false, - callChain: payload.callChain, + callChain: ensureArray(payload.callChain), correlation, } diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 5044eab5639..03d351795bb 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -148,7 +148,7 @@ export class BlockExecutor { block, streamingExec, resolvedInputs, - ctx.selectedOutputs ?? [] + Array.isArray(ctx.selectedOutputs) ? ctx.selectedOutputs : [] ) } diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index 8e3a8c8c8c9..5a8d8b8b6a9 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -325,7 +325,9 @@ export class DAGExecutor { : new Set(), workflow: this.workflow, stream: this.contextExtensions.stream ?? false, - selectedOutputs: this.contextExtensions.selectedOutputs ?? [], + selectedOutputs: Array.isArray(this.contextExtensions.selectedOutputs) + ? this.contextExtensions.selectedOutputs + : [], edges: this.contextExtensions.edges ?? [], onStream: this.contextExtensions.onStream, onBlockStart: this.contextExtensions.onBlockStart, diff --git a/apps/sim/executor/execution/snapshot-serializer.ts b/apps/sim/executor/execution/snapshot-serializer.ts index 6674a88ab6e..98e0ccea512 100644 --- a/apps/sim/executor/execution/snapshot-serializer.ts +++ b/apps/sim/executor/execution/snapshot-serializer.ts @@ -119,7 +119,7 @@ export function serializePauseSnapshot( context.workflow, {}, context.workflowVariables ?? {}, - context.selectedOutputs ?? [], + Array.isArray(context.selectedOutputs) ? context.selectedOutputs : [], state ) diff --git a/apps/sim/lib/core/utils/json-sanitize.ts b/apps/sim/lib/core/utils/json-sanitize.ts new file mode 100644 index 00000000000..3448d74958b --- /dev/null +++ b/apps/sim/lib/core/utils/json-sanitize.ts @@ -0,0 +1,68 @@ +/** + * Sanitization for JSON data round-tripped through Redis Lua cjson. + * + * Lua's cjson library cannot distinguish between empty arrays `[]` and empty objects `{}`. + * Both serialize to `{}` in Lua tables. When BullMQ's internal Lua scripts touch job data, + * any empty array in the payload silently becomes `{}`. + * + * Applied once at the worker boundary before data enters the execution engine. + */ + +/** + * Returns `value` if it's an array, otherwise `[]`. + */ +export function ensureArray(value: unknown): T[] { + return Array.isArray(value) ? value : [] +} + +const EXECUTION_STATE_ARRAY_FIELDS = [ + 'executedBlocks', + 'blockLogs', + 'completedLoops', + 'activeExecutionPath', + 'pendingQueue', + 'remainingEdges', + 'completedPauseContexts', +] + +/** + * Normalizes all known array fields on a BullMQ-deserialized workflow execution payload. + * Mutates in place — call once before passing into the execution engine. + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export function sanitizeBullMQPayload(payload: any): void { + if (!payload) return + + payload.selectedOutputs = ensureArray(payload.selectedOutputs) + + if (payload.metadata) { + payload.metadata.callChain = ensureArray(payload.metadata.callChain) + + if (payload.metadata.pendingBlocks !== undefined) { + payload.metadata.pendingBlocks = ensureArray(payload.metadata.pendingBlocks) + } + + if (payload.metadata.workflowStateOverride?.edges !== undefined) { + payload.metadata.workflowStateOverride.edges = ensureArray( + payload.metadata.workflowStateOverride.edges + ) + } + } + + if (payload.runFromBlock?.sourceSnapshot) { + const state = payload.runFromBlock.sourceSnapshot + for (const field of EXECUTION_STATE_ARRAY_FIELDS) { + if (field in state && !Array.isArray(state[field])) { + state[field] = [] + } + } + + if (state.dagIncomingEdges && typeof state.dagIncomingEdges === 'object') { + for (const key of Object.keys(state.dagIncomingEdges)) { + if (!Array.isArray(state.dagIncomingEdges[key])) { + state.dagIncomingEdges[key] = [] + } + } + } + } +} diff --git a/apps/sim/lib/workflows/executor/execute-workflow.ts b/apps/sim/lib/workflows/executor/execute-workflow.ts index 6381d694fd1..599b4984a7d 100644 --- a/apps/sim/lib/workflows/executor/execute-workflow.ts +++ b/apps/sim/lib/workflows/executor/execute-workflow.ts @@ -76,7 +76,7 @@ export async function executeWorkflow( workflow, input, workflow.variables || {}, - streamConfig?.selectedOutputs || [] + Array.isArray(streamConfig?.selectedOutputs) ? streamConfig.selectedOutputs : [] ) const result = await executeWorkflowCore({ diff --git a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts index ae1f8e4e105..6f1b09f35e4 100644 --- a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts +++ b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts @@ -1,5 +1,6 @@ import { createLogger } from '@sim/logger' import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits' +import { sanitizeBullMQPayload } from '@/lib/core/utils/json-sanitize' import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -103,12 +104,14 @@ export async function executeQueuedWorkflowJob( } try { + sanitizeBullMQPayload(payload) + const snapshot = new ExecutionSnapshot( metadata, payload.workflow, payload.input, payload.variables, - Array.isArray(payload.selectedOutputs) ? payload.selectedOutputs : [] + payload.selectedOutputs as string[] ) let callbacks = {}