Skip to content

Commit 07716cd

Browse files
committed
fix(cjson): santization bullmq payloads accurately
1 parent 9fc22a5 commit 07716cd

7 files changed

Lines changed: 80 additions & 6 deletions

File tree

apps/sim/background/workflow-execution.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { task } from '@trigger.dev/sdk'
33
import { v4 as uuidv4 } from 'uuid'
44
import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
55
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
6+
import { ensureArray } from '@/lib/core/utils/json-sanitize'
67
import { preprocessExecution } from '@/lib/execution/preprocessing'
78
import { LoggingSession } from '@/lib/logs/execution/logging-session'
89
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -110,7 +111,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
110111
useDraftState: false,
111112
startTime: new Date().toISOString(),
112113
isClientSession: false,
113-
callChain: payload.callChain,
114+
callChain: ensureArray(payload.callChain),
114115
correlation,
115116
}
116117

apps/sim/executor/execution/block-executor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ export class BlockExecutor {
148148
block,
149149
streamingExec,
150150
resolvedInputs,
151-
ctx.selectedOutputs ?? []
151+
Array.isArray(ctx.selectedOutputs) ? ctx.selectedOutputs : []
152152
)
153153
}
154154

apps/sim/executor/execution/executor.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,9 @@ export class DAGExecutor {
325325
: new Set(),
326326
workflow: this.workflow,
327327
stream: this.contextExtensions.stream ?? false,
328-
selectedOutputs: this.contextExtensions.selectedOutputs ?? [],
328+
selectedOutputs: Array.isArray(this.contextExtensions.selectedOutputs)
329+
? this.contextExtensions.selectedOutputs
330+
: [],
329331
edges: this.contextExtensions.edges ?? [],
330332
onStream: this.contextExtensions.onStream,
331333
onBlockStart: this.contextExtensions.onBlockStart,

apps/sim/executor/execution/snapshot-serializer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ export function serializePauseSnapshot(
119119
context.workflow,
120120
{},
121121
context.workflowVariables ?? {},
122-
context.selectedOutputs ?? [],
122+
Array.isArray(context.selectedOutputs) ? context.selectedOutputs : [],
123123
state
124124
)
125125

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Sanitization for JSON data round-tripped through Redis Lua cjson.
3+
*
4+
* Lua's cjson library cannot distinguish between empty arrays `[]` and empty objects `{}`.
5+
* Both serialize to `{}` in Lua tables. When BullMQ's internal Lua scripts touch job data,
6+
* any empty array in the payload silently becomes `{}`.
7+
*
8+
* Applied once at the worker boundary before data enters the execution engine.
9+
*/
10+
11+
/**
12+
* Returns `value` if it's an array, otherwise `[]`.
13+
*/
14+
export function ensureArray<T>(value: unknown): T[] {
15+
return Array.isArray(value) ? value : []
16+
}
17+
18+
const EXECUTION_STATE_ARRAY_FIELDS = [
19+
'executedBlocks',
20+
'blockLogs',
21+
'completedLoops',
22+
'activeExecutionPath',
23+
'pendingQueue',
24+
'remainingEdges',
25+
'completedPauseContexts',
26+
]
27+
28+
/**
29+
* Normalizes all known array fields on a BullMQ-deserialized workflow execution payload.
30+
* Mutates in place — call once before passing into the execution engine.
31+
*/
32+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
33+
export function sanitizeBullMQPayload(payload: any): void {
34+
if (!payload) return
35+
36+
payload.selectedOutputs = ensureArray(payload.selectedOutputs)
37+
38+
if (payload.metadata) {
39+
payload.metadata.callChain = ensureArray(payload.metadata.callChain)
40+
41+
if (payload.metadata.pendingBlocks !== undefined) {
42+
payload.metadata.pendingBlocks = ensureArray(payload.metadata.pendingBlocks)
43+
}
44+
45+
if (payload.metadata.workflowStateOverride?.edges !== undefined) {
46+
payload.metadata.workflowStateOverride.edges = ensureArray(
47+
payload.metadata.workflowStateOverride.edges
48+
)
49+
}
50+
}
51+
52+
if (payload.runFromBlock?.sourceSnapshot) {
53+
const state = payload.runFromBlock.sourceSnapshot
54+
for (const field of EXECUTION_STATE_ARRAY_FIELDS) {
55+
if (field in state && !Array.isArray(state[field])) {
56+
state[field] = []
57+
}
58+
}
59+
60+
if (state.dagIncomingEdges && typeof state.dagIncomingEdges === 'object') {
61+
for (const key of Object.keys(state.dagIncomingEdges)) {
62+
if (!Array.isArray(state.dagIncomingEdges[key])) {
63+
state.dagIncomingEdges[key] = []
64+
}
65+
}
66+
}
67+
}
68+
}

apps/sim/lib/workflows/executor/execute-workflow.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ export async function executeWorkflow(
7676
workflow,
7777
input,
7878
workflow.variables || {},
79-
streamConfig?.selectedOutputs || []
79+
Array.isArray(streamConfig?.selectedOutputs) ? streamConfig.selectedOutputs : []
8080
)
8181

8282
const result = await executeWorkflowCore({

apps/sim/lib/workflows/executor/queued-workflow-execution.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { createLogger } from '@sim/logger'
22
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
3+
import { sanitizeBullMQPayload } from '@/lib/core/utils/json-sanitize'
34
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
45
import { LoggingSession } from '@/lib/logs/execution/logging-session'
56
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -103,12 +104,14 @@ export async function executeQueuedWorkflowJob(
103104
}
104105

105106
try {
107+
sanitizeBullMQPayload(payload)
108+
106109
const snapshot = new ExecutionSnapshot(
107110
metadata,
108111
payload.workflow,
109112
payload.input,
110113
payload.variables,
111-
Array.isArray(payload.selectedOutputs) ? payload.selectedOutputs : []
114+
payload.selectedOutputs as string[]
112115
)
113116

114117
let callbacks = {}

0 commit comments

Comments
 (0)