Skip to content

Commit 7c68061

Browse files
fix(cjson): santize bullmq payloads accurately (#3892)
* improvement(queue): remove indirection for flag * improvement(lua): return job id instedad of serializing within lua * memory improvements * cleanup code * fix types * fix trigger dev, bullmq, inline prioritization * address comments * fix tests * remove unused lua code * fix(cjson): santization bullmq payloads accurately
1 parent df6ceb6 commit 7c68061

File tree

19 files changed

+184
-88
lines changed

19 files changed

+184
-88
lines changed

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import { and, eq, isNull, lt, lte, ne, not, or, sql } from 'drizzle-orm'
44
import { type NextRequest, NextResponse } from 'next/server'
55
import { v4 as uuidv4 } from 'uuid'
66
import { verifyCronAuth } from '@/lib/auth/internal'
7-
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
7+
import { getJobQueue } from '@/lib/core/async-jobs'
88
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
9+
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
910
import { generateRequestId } from '@/lib/core/utils/request'
1011
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
1112
import {
@@ -157,7 +158,7 @@ export async function GET(request: NextRequest) {
157158
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
158159
)
159160

160-
if (shouldExecuteInline()) {
161+
if (!isBullMQEnabled() && !isTriggerDevEnabled) {
161162
try {
162163
await jobQueue.startJob(jobId)
163164
const output = await executeScheduleJob(payload)

apps/sim/app/api/workflows/[id]/execute/route.async.test.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,24 @@ vi.mock('@/lib/core/async-jobs', () => ({
4646
completeJob: vi.fn(),
4747
markJobFailed: vi.fn(),
4848
}),
49-
shouldExecuteInline: vi.fn().mockReturnValue(false),
50-
shouldUseBullMQ: vi.fn().mockReturnValue(true),
5149
}))
5250

5351
vi.mock('@/lib/core/bullmq', () => ({
54-
createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({ payload, metadata })),
52+
createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({
53+
payload,
54+
metadata: metadata ?? {},
55+
})),
56+
isBullMQEnabled: vi.fn().mockReturnValue(true),
5557
}))
5658

5759
vi.mock('@/lib/core/workspace-dispatch', () => ({
60+
DispatchQueueFullError: class DispatchQueueFullError extends Error {
61+
statusCode = 503
62+
constructor(scope: string, depth: number, limit: number) {
63+
super(`${scope} queue at capacity (${depth}/${limit})`)
64+
this.name = 'DispatchQueueFullError'
65+
}
66+
},
5867
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
5968
waitForDispatchJob: vi.fn(),
6069
}))

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
44
import { z } from 'zod'
55
import { AuthType, checkHybridAuth, hasExternalApiCredentials } from '@/lib/auth/hybrid'
66
import { admissionRejectedResponse, tryAdmit } from '@/lib/core/admission/gate'
7-
import { getJobQueue, shouldExecuteInline, shouldUseBullMQ } from '@/lib/core/async-jobs'
8-
import { createBullMQJobData } from '@/lib/core/bullmq'
7+
import { getJobQueue } from '@/lib/core/async-jobs'
8+
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
9+
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
910
import {
1011
createTimeoutAbortController,
1112
getTimeoutErrorMessage,
@@ -216,7 +217,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
216217
}
217218

218219
try {
219-
const useBullMQ = shouldUseBullMQ()
220+
const useBullMQ = isBullMQEnabled()
220221
const jobQueue = useBullMQ ? null : await getJobQueue()
221222
const jobId = useBullMQ
222223
? await enqueueWorkspaceDispatch({
@@ -242,7 +243,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
242243

243244
asyncLogger.info('Queued async workflow execution', { jobId })
244245

245-
if (shouldExecuteInline() && jobQueue) {
246+
if (!isBullMQEnabled() && !isTriggerDevEnabled && jobQueue) {
246247
const inlineJobQueue = jobQueue
247248
void (async () => {
248249
try {
@@ -793,7 +794,7 @@ async function handleExecutePost(
793794

794795
const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}
795796

796-
if (shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)) {
797+
if (isBullMQEnabled() && !INLINE_TRIGGER_TYPES.has(triggerType)) {
797798
try {
798799
const dispatchJobId = await enqueueDirectWorkflowExecution(
799800
{
@@ -993,7 +994,7 @@ async function handleExecutePost(
993994
}
994995

995996
if (shouldUseDraftState) {
996-
const shouldDispatchViaQueue = shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)
997+
const shouldDispatchViaQueue = isBullMQEnabled() && !INLINE_TRIGGER_TYPES.has(triggerType)
997998
if (shouldDispatchViaQueue) {
998999
const metadata: ExecutionMetadata = {
9991000
requestId,

apps/sim/background/workflow-execution.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { task } from '@trigger.dev/sdk'
33
import { v4 as uuidv4 } from 'uuid'
44
import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
55
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
6+
import { ensureArray } from '@/lib/core/utils/json-sanitize'
67
import { preprocessExecution } from '@/lib/execution/preprocessing'
78
import { LoggingSession } from '@/lib/logs/execution/logging-session'
89
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -110,7 +111,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
110111
useDraftState: false,
111112
startTime: new Date().toISOString(),
112113
isClientSession: false,
113-
callChain: payload.callChain,
114+
callChain: ensureArray(payload.callChain),
114115
correlation,
115116
}
116117

apps/sim/executor/execution/block-executor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ export class BlockExecutor {
148148
block,
149149
streamingExec,
150150
resolvedInputs,
151-
ctx.selectedOutputs ?? []
151+
Array.isArray(ctx.selectedOutputs) ? ctx.selectedOutputs : []
152152
)
153153
}
154154

apps/sim/executor/execution/executor.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,9 @@ export class DAGExecutor {
325325
: new Set(),
326326
workflow: this.workflow,
327327
stream: this.contextExtensions.stream ?? false,
328-
selectedOutputs: this.contextExtensions.selectedOutputs ?? [],
328+
selectedOutputs: Array.isArray(this.contextExtensions.selectedOutputs)
329+
? this.contextExtensions.selectedOutputs
330+
: [],
329331
edges: this.contextExtensions.edges ?? [],
330332
onStream: this.contextExtensions.onStream,
331333
onBlockStart: this.contextExtensions.onBlockStart,

apps/sim/executor/execution/snapshot-serializer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ export function serializePauseSnapshot(
119119
context.workflow,
120120
{},
121121
context.workflowVariables ?? {},
122-
context.selectedOutputs ?? [],
122+
Array.isArray(context.selectedOutputs) ? context.selectedOutputs : [],
123123
state
124124
)
125125

apps/sim/executor/execution/snapshot.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export class ExecutionSnapshot {
2828
data.workflow,
2929
data.input,
3030
data.workflowVariables,
31-
data.selectedOutputs,
31+
Array.isArray(data.selectedOutputs) ? data.selectedOutputs : [],
3232
data.state
3333
)
3434
}

apps/sim/lib/core/async-jobs/config.ts

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,18 +95,6 @@ export async function getInlineJobQueue(): Promise<JobQueueBackend> {
9595
return cachedInlineBackend
9696
}
9797

98-
/**
99-
* Checks if jobs should be executed inline in-process.
100-
* Database fallback is the only mode that still relies on inline execution.
101-
*/
102-
export function shouldExecuteInline(): boolean {
103-
return getAsyncBackendType() === 'database'
104-
}
105-
106-
export function shouldUseBullMQ(): boolean {
107-
return isBullMQEnabled()
108-
}
109-
11098
/**
11199
* Resets the cached backend (useful for testing)
112100
*/

apps/sim/lib/core/async-jobs/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ export {
44
getInlineJobQueue,
55
getJobQueue,
66
resetJobQueueCache,
7-
shouldExecuteInline,
8-
shouldUseBullMQ,
97
} from './config'
108
export type {
119
AsyncBackendType,

0 commit comments

Comments
 (0)