Skip to content
5 changes: 3 additions & 2 deletions apps/sim/app/api/schedules/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { and, eq, isNull, lt, lte, ne, not, or, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { verifyCronAuth } from '@/lib/auth/internal'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { getJobQueue } from '@/lib/core/async-jobs'
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import {
Expand Down Expand Up @@ -157,7 +158,7 @@ export async function GET(request: NextRequest) {
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
)

if (shouldExecuteInline()) {
if (!isBullMQEnabled() && !isTriggerDevEnabled) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 isTriggerDevEnabled used inconsistently as a value vs. function call

isBullMQEnabled() is called as a function (parentheses), while isTriggerDevEnabled is referenced as a constant (no parentheses). Both are correct — isBullMQEnabled is a function and isTriggerDevEnabled is a module-level constant — but the asymmetry can confuse readers who might expect both to be function calls given the naming convention isFoo.

The same pattern appears at apps/sim/app/api/workflows/[id]/execute/route.ts (line 246) and apps/sim/lib/webhooks/processor.ts (line 1268). No code change is strictly necessary, but adding an inline comment clarifying that isTriggerDevEnabled is a constant (evaluated at module load) would aid readability.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

try {
await jobQueue.startJob(jobId)
const output = await executeScheduleJob(payload)
Expand Down
15 changes: 12 additions & 3 deletions apps/sim/app/api/workflows/[id]/execute/route.async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,24 @@ vi.mock('@/lib/core/async-jobs', () => ({
completeJob: vi.fn(),
markJobFailed: vi.fn(),
}),
shouldExecuteInline: vi.fn().mockReturnValue(false),
shouldUseBullMQ: vi.fn().mockReturnValue(true),
}))

vi.mock('@/lib/core/bullmq', () => ({
createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({ payload, metadata })),
createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({
payload,
metadata: metadata ?? {},
})),
isBullMQEnabled: vi.fn().mockReturnValue(true),
}))

vi.mock('@/lib/core/workspace-dispatch', () => ({
DispatchQueueFullError: class DispatchQueueFullError extends Error {
statusCode = 503
constructor(scope: string, depth: number, limit: number) {
super(`${scope} queue at capacity (${depth}/${limit})`)
this.name = 'DispatchQueueFullError'
}
},
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
waitForDispatchJob: vi.fn(),
}))
Expand Down
13 changes: 7 additions & 6 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { AuthType, checkHybridAuth, hasExternalApiCredentials } from '@/lib/auth/hybrid'
import { admissionRejectedResponse, tryAdmit } from '@/lib/core/admission/gate'
import { getJobQueue, shouldExecuteInline, shouldUseBullMQ } from '@/lib/core/async-jobs'
import { createBullMQJobData } from '@/lib/core/bullmq'
import { getJobQueue } from '@/lib/core/async-jobs'
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import {
createTimeoutAbortController,
getTimeoutErrorMessage,
Expand Down Expand Up @@ -216,7 +217,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
}

try {
const useBullMQ = shouldUseBullMQ()
const useBullMQ = isBullMQEnabled()
const jobQueue = useBullMQ ? null : await getJobQueue()
const jobId = useBullMQ
? await enqueueWorkspaceDispatch({
Expand All @@ -242,7 +243,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR

asyncLogger.info('Queued async workflow execution', { jobId })

if (shouldExecuteInline() && jobQueue) {
if (!isBullMQEnabled() && !isTriggerDevEnabled && jobQueue) {
const inlineJobQueue = jobQueue
void (async () => {
try {
Expand Down Expand Up @@ -793,7 +794,7 @@ async function handleExecutePost(

const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}

if (shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)) {
if (isBullMQEnabled() && !INLINE_TRIGGER_TYPES.has(triggerType)) {
try {
const dispatchJobId = await enqueueDirectWorkflowExecution(
{
Expand Down Expand Up @@ -993,7 +994,7 @@ async function handleExecutePost(
}

if (shouldUseDraftState) {
const shouldDispatchViaQueue = shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)
const shouldDispatchViaQueue = isBullMQEnabled() && !INLINE_TRIGGER_TYPES.has(triggerType)
if (shouldDispatchViaQueue) {
const metadata: ExecutionMetadata = {
requestId,
Expand Down
3 changes: 2 additions & 1 deletion apps/sim/background/workflow-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { task } from '@trigger.dev/sdk'
import { v4 as uuidv4 } from 'uuid'
import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { ensureArray } from '@/lib/core/utils/json-sanitize'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
Expand Down Expand Up @@ -110,7 +111,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
useDraftState: false,
startTime: new Date().toISOString(),
isClientSession: false,
callChain: payload.callChain,
callChain: ensureArray(payload.callChain),
correlation,
}

Expand Down
2 changes: 1 addition & 1 deletion apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ export class BlockExecutor {
block,
streamingExec,
resolvedInputs,
ctx.selectedOutputs ?? []
Array.isArray(ctx.selectedOutputs) ? ctx.selectedOutputs : []
)
}

Expand Down
4 changes: 3 additions & 1 deletion apps/sim/executor/execution/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ export class DAGExecutor {
: new Set(),
workflow: this.workflow,
stream: this.contextExtensions.stream ?? false,
selectedOutputs: this.contextExtensions.selectedOutputs ?? [],
selectedOutputs: Array.isArray(this.contextExtensions.selectedOutputs)
? this.contextExtensions.selectedOutputs
: [],
edges: this.contextExtensions.edges ?? [],
onStream: this.contextExtensions.onStream,
onBlockStart: this.contextExtensions.onBlockStart,
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/executor/execution/snapshot-serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export function serializePauseSnapshot(
context.workflow,
{},
context.workflowVariables ?? {},
context.selectedOutputs ?? [],
Array.isArray(context.selectedOutputs) ? context.selectedOutputs : [],
state
)

Expand Down
2 changes: 1 addition & 1 deletion apps/sim/executor/execution/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class ExecutionSnapshot {
data.workflow,
data.input,
data.workflowVariables,
data.selectedOutputs,
Array.isArray(data.selectedOutputs) ? data.selectedOutputs : [],
data.state
)
}
Expand Down
12 changes: 0 additions & 12 deletions apps/sim/lib/core/async-jobs/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,6 @@ export async function getInlineJobQueue(): Promise<JobQueueBackend> {
return cachedInlineBackend
}

/**
* Checks if jobs should be executed inline in-process.
* Database fallback is the only mode that still relies on inline execution.
*/
export function shouldExecuteInline(): boolean {
return getAsyncBackendType() === 'database'
}

export function shouldUseBullMQ(): boolean {
return isBullMQEnabled()
}

/**
* Resets the cached backend (useful for testing)
*/
Expand Down
2 changes: 0 additions & 2 deletions apps/sim/lib/core/async-jobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ export {
getInlineJobQueue,
getJobQueue,
resetJobQueueCache,
shouldExecuteInline,
shouldUseBullMQ,
} from './config'
export type {
AsyncBackendType,
Expand Down
4 changes: 2 additions & 2 deletions apps/sim/lib/core/bullmq/connection.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { ConnectionOptions } from 'bullmq'
import { env, isTruthy } from '@/lib/core/config/env'
import { env } from '@/lib/core/config/env'

export function isBullMQEnabled(): boolean {
return isTruthy(env.CONCURRENCY_CONTROL_ENABLED) && Boolean(env.REDIS_URL)
return Boolean(env.REDIS_URL)
}
Comment on lines 4 to 6
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Implicit BullMQ activation breaks deployments without workers

isBullMQEnabled() now returns true whenever REDIS_URL is set. Previously it also required CONCURRENCY_CONTROL_ENABLED=true, which was the explicit opt-in gate. Many self-hosted deployments configure REDIS_URL for caching, rate-limiting, or session storage without running BullMQ workers. After this change, any such deployment will start routing async workflow executions to BullMQ queues that are never drained — jobs silently queue up and never run.

The old guard (isTruthy(env.CONCURRENCY_CONTROL_ENABLED) && Boolean(env.REDIS_URL)) ensured both the infrastructure (Redis) and the operator intent (CONCURRENCY_CONTROL_ENABLED=true) were present. Removing CONCURRENCY_CONTROL_ENABLED entirely eliminates that intent signal with no migration path.

If the goal is to consolidate configuration, consider keeping REDIS_URL as the sole gate but documenting this clearly, and ensuring all deployment guides explicitly note that providing REDIS_URL now enables BullMQ.


export function getBullMQConnectionOptions(): ConnectionOptions {
Expand Down
28 changes: 14 additions & 14 deletions apps/sim/lib/core/bullmq/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ function getQueueDefaultOptions(type: JobType) {
return {
attempts: 3,
backoff: { type: 'exponential' as const, delay: 1000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 1000 },
removeOnFail: { age: 2 * 60 * 60, count: 500 },
}
case 'webhook-execution':
return {
Comment on lines +31 to 35
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Aggressive TTL reduction limits observability and retry windows

All BullMQ queue retention has been reduced from 24 h / 7 days down to 2 hours, and the same change applies to JOB_TTL_SECONDS in redis-store.ts (previously 48 h). While shorter retention helps control Redis memory, 2 hours is a very tight window:

  • A failed workflow job that fails at 01:00 and isn't noticed until 03:30 will have already been evicted — no state to inspect or manually retry.
  • removeOnFail at 2 h means failed jobs disappear before an on-call engineer in a different timezone can investigate them.

The previous removeOnFail values (7 days for workflow-execution, 3 days for webhook-execution and schedule-execution) provided a more reasonable recovery window. Consider retaining a longer failure TTL (e.g., 24–72 h) while keeping the shorter success TTL for completed jobs.

attempts: 2,
backoff: { type: 'exponential' as const, delay: 2000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 3 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 1000 },
removeOnFail: { age: 2 * 60 * 60, count: 500 },
}
case 'schedule-execution':
return {
attempts: 2,
backoff: { type: 'exponential' as const, delay: 5000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 3 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 1000 },
removeOnFail: { age: 2 * 60 * 60, count: 500 },
}
}
}
Expand All @@ -69,8 +69,8 @@ function createNamedQueue(
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 500 },
removeOnFail: { age: 2 * 60 * 60, count: 200 },
},
})
case KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE:
Expand All @@ -79,26 +79,26 @@ function createNamedQueue(
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 500 },
removeOnFail: { age: 2 * 60 * 60, count: 200 },
},
})
case MOTHERSHIP_JOB_EXECUTION_QUEUE:
return new Queue(name, {
connection: getBullMQConnectionOptions(),
defaultJobOptions: {
attempts: 1,
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 500 },
removeOnFail: { age: 2 * 60 * 60, count: 200 },
},
})
case WORKSPACE_NOTIFICATION_DELIVERY_QUEUE:
return new Queue(name, {
connection: getBullMQConnectionOptions(),
defaultJobOptions: {
attempts: 1,
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
removeOnComplete: { age: 2 * 60 * 60, count: 500 },
removeOnFail: { age: 2 * 60 * 60, count: 200 },
},
})
}
Expand Down
1 change: 0 additions & 1 deletion apps/sim/lib/core/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ export const env = createEnv({
FREE_PLAN_LOG_RETENTION_DAYS: z.string().optional(), // Log retention days for free plan users

// Admission & Burst Protection
CONCURRENCY_CONTROL_ENABLED: z.string().optional().default('false'), // Set to 'true' to enable BullMQ-based concurrency control (default: inline execution)
ADMISSION_GATE_MAX_INFLIGHT: z.string().optional().default('500'), // Max concurrent in-flight execution requests per pod
DISPATCH_MAX_QUEUE_PER_WORKSPACE: z.string().optional().default('1000'), // Max queued dispatch jobs per workspace
DISPATCH_MAX_QUEUE_GLOBAL: z.string().optional().default('50000'), // Max queued dispatch jobs globally
Expand Down
68 changes: 68 additions & 0 deletions apps/sim/lib/core/utils/json-sanitize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Sanitization for JSON data round-tripped through Redis Lua cjson.
*
* Lua's cjson library cannot distinguish between empty arrays `[]` and empty objects `{}`.
* Both serialize to `{}` in Lua tables. When BullMQ's internal Lua scripts touch job data,
* any empty array in the payload silently becomes `{}`.
*
* Applied once at the worker boundary before data enters the execution engine.
*/

/**
* Returns `value` if it's an array, otherwise `[]`.
*/
export function ensureArray<T>(value: unknown): T[] {
return Array.isArray(value) ? value : []
}

const EXECUTION_STATE_ARRAY_FIELDS = [
'executedBlocks',
'blockLogs',
'completedLoops',
'activeExecutionPath',
'pendingQueue',
'remainingEdges',
'completedPauseContexts',
]

/**
* Normalizes all known array fields on a BullMQ-deserialized workflow execution payload.
* Mutates in place — call once before passing into the execution engine.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function sanitizeBullMQPayload(payload: any): void {
if (!payload) return

payload.selectedOutputs = ensureArray(payload.selectedOutputs)

if (payload.metadata) {
payload.metadata.callChain = ensureArray(payload.metadata.callChain)

if (payload.metadata.pendingBlocks !== undefined) {
payload.metadata.pendingBlocks = ensureArray(payload.metadata.pendingBlocks)
}

if (payload.metadata.workflowStateOverride?.edges !== undefined) {
payload.metadata.workflowStateOverride.edges = ensureArray(
payload.metadata.workflowStateOverride.edges
)
}
}

if (payload.runFromBlock?.sourceSnapshot) {
const state = payload.runFromBlock.sourceSnapshot
for (const field of EXECUTION_STATE_ARRAY_FIELDS) {
if (field in state && !Array.isArray(state[field])) {
state[field] = []
}
}

if (state.dagIncomingEdges && typeof state.dagIncomingEdges === 'object') {
for (const key of Object.keys(state.dagIncomingEdges)) {
if (!Array.isArray(state.dagIncomingEdges[key])) {
state.dagIncomingEdges[key] = []
}
}
}
}
Comment on lines +33 to +67
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 sanitizeBullMQPayload has no unit tests

The PR checklist marks "Tests added/updated and passing", but no test file for json-sanitize.ts appears in the diff. The sanitization logic handles several nested paths (payload.metadata.callChain, payload.metadata.workflowStateOverride.edges, payload.runFromBlock.sourceSnapshot.*, and dagIncomingEdges per-key) that are easy to accidentally break.

Consider adding a dedicated test file that exercises:

  • Top-level fields (selectedOutputs: {}[])
  • Nested metadata fields
  • The EXECUTION_STATE_ARRAY_FIELDS loop with mixed present/absent keys
  • The dagIncomingEdges per-key fix
  • A null/undefined payload (guard at line 34)

}
2 changes: 2 additions & 0 deletions apps/sim/lib/core/workspace-dispatch/memory-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ export class MemoryWorkspaceDispatchStorage implements WorkspaceDispatchStorageA
status: 'completed',
completedAt: Date.now(),
output,
bullmqPayload: undefined,
}))
}

Expand All @@ -482,6 +483,7 @@ export class MemoryWorkspaceDispatchStorage implements WorkspaceDispatchStorageA
status: 'failed',
completedAt: Date.now(),
error,
bullmqPayload: undefined,
}))
}

Expand Down
Loading
Loading