From c8ae0c48781844df6dd2d4eb9060e306f2060d15 Mon Sep 17 00:00:00 2001 From: VOIDXAI Date: Tue, 31 Mar 2026 11:38:00 +0800 Subject: [PATCH 1/2] codex: preserve buffered subagent thread labels --- plugins/codex/scripts/lib/codex.mjs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/plugins/codex/scripts/lib/codex.mjs b/plugins/codex/scripts/lib/codex.mjs index bf7e8c8..af054a5 100644 --- a/plugins/codex/scripts/lib/codex.mjs +++ b/plugins/codex/scripts/lib/codex.mjs @@ -550,6 +550,10 @@ function applyTurnNotification(state, message) { } } +function isThreadLifecycleNotification(message) { + return message.method === "thread/started" || message.method === "thread/name/updated"; +} + async function captureTurn(client, threadId, startRequest, options = {}) { const state = createTurnCaptureState(threadId, options); const previousHandler = client.notificationHandler; @@ -560,7 +564,7 @@ async function captureTurn(client, threadId, startRequest, options = {}) { return; } - if (message.method === "thread/started" || message.method === "thread/name/updated") { + if (isThreadLifecycleNotification(message)) { applyTurnNotification(state, message); return; } @@ -583,7 +587,7 @@ async function captureTurn(client, threadId, startRequest, options = {}) { state.threadTurnIds.set(state.threadId, state.turnId); } for (const message of state.bufferedNotifications) { - if (belongsToTurn(state, message)) { + if (isThreadLifecycleNotification(message) || belongsToTurn(state, message)) { applyTurnNotification(state, message); } else { if (previousHandler) { From 09b2105f6fb6dd9e845ad7f2058c3a23bfd3cc04 Mon Sep 17 00:00:00 2001 From: VOIDXAI Date: Tue, 31 Mar 2026 14:31:45 +0800 Subject: [PATCH 2/2] codex: defer unknown thread lifecycle notifications --- plugins/codex/scripts/lib/codex.mjs | 99 ++++++++++++++++++++++---- tests/codex-buffered-subagent.test.mjs | 51 +++++++++++++ tests/fake-codex-fixture.mjs | 85 +++++++++++++++++----- 3 files changed, 203 insertions(+), 32 deletions(-) create mode 100644 tests/codex-buffered-subagent.test.mjs diff --git a/plugins/codex/scripts/lib/codex.mjs b/plugins/codex/scripts/lib/codex.mjs index af054a5..c510d3c 100644 --- a/plugins/codex/scripts/lib/codex.mjs +++ b/plugins/codex/scripts/lib/codex.mjs @@ -15,6 +15,8 @@ * threadLabels: Map, * turnId: string | null, * bufferedNotifications: AppServerNotification[], + * pendingThreadLifecycleNotifications: AppServerNotification[], + * drainingThreadLifecycleNotifications: boolean, * completion: Promise, * resolveCompletion: (state: TurnCaptureState) => void, * rejectCompletion: (error: unknown) => void, @@ -230,6 +232,8 @@ function registerThread(state, threadId, options = {}) { if (label) { state.threadLabels.set(threadId, label); } + + drainPendingThreadLifecycleNotifications(state); } function describeStartedItem(state, item) { @@ -310,6 +314,8 @@ function createTurnCaptureState(threadId, options = {}) { threadLabels: new Map(), turnId: null, bufferedNotifications: [], + pendingThreadLifecycleNotifications: [], + drainingThreadLifecycleNotifications: false, completion, resolveCompletion, rejectCompletion, @@ -554,6 +560,77 @@ function isThreadLifecycleNotification(message) { return message.method === "thread/started" || message.method === "thread/name/updated"; } +function extractLifecycleThreadId(message) { + if (message.method === "thread/started") { + return message.params?.thread?.id ?? null; + } + if (message.method === "thread/name/updated") { + return message.params?.threadId ?? null; + } + return null; +} + +function canApplyThreadLifecycleNotification(state, message) { + const threadId = extractLifecycleThreadId(message); + return Boolean(threadId && state.threadIds.has(threadId)); +} + +function drainPendingThreadLifecycleNotifications(state) { + if (state.drainingThreadLifecycleNotifications || state.pendingThreadLifecycleNotifications.length === 0) { + return; + } + + state.drainingThreadLifecycleNotifications = true; + try { + const remaining = []; + for (const message of state.pendingThreadLifecycleNotifications) { + if (canApplyThreadLifecycleNotification(state, message)) { + applyTurnNotification(state, message); + } else { + remaining.push(message); + } + } + state.pendingThreadLifecycleNotifications = remaining; + } finally { + state.drainingThreadLifecycleNotifications = false; + } +} + +function handleThreadLifecycleNotification(state, message) { + if (canApplyThreadLifecycleNotification(state, message)) { + applyTurnNotification(state, message); + return; + } + + state.pendingThreadLifecycleNotifications.push(message); +} + +function flushPendingThreadLifecycleNotifications(state, previousHandler) { + if (state.pendingThreadLifecycleNotifications.length === 0) { + return; + } + + for (const message of state.pendingThreadLifecycleNotifications) { + if (previousHandler) { + previousHandler(message); + } + } + state.pendingThreadLifecycleNotifications.length = 0; +} + +function replayBufferedNotifications(state, previousHandler) { + for (const message of state.bufferedNotifications) { + if (isThreadLifecycleNotification(message)) { + handleThreadLifecycleNotification(state, message); + } else if (belongsToTurn(state, message)) { + applyTurnNotification(state, message); + } else if (previousHandler) { + previousHandler(message); + } + } + state.bufferedNotifications.length = 0; +} + async function captureTurn(client, threadId, startRequest, options = {}) { const state = createTurnCaptureState(threadId, options); const previousHandler = client.notificationHandler; @@ -565,15 +642,15 @@ async function captureTurn(client, threadId, startRequest, options = {}) { } if (isThreadLifecycleNotification(message)) { - applyTurnNotification(state, message); + handleThreadLifecycleNotification(state, message); return; } if (!belongsToTurn(state, message)) { - if (previousHandler) { - previousHandler(message); - } - return; + if (previousHandler) { + previousHandler(message); + } + return; } applyTurnNotification(state, message); @@ -586,16 +663,7 @@ async function captureTurn(client, threadId, startRequest, options = {}) { if (state.turnId) { state.threadTurnIds.set(state.threadId, state.turnId); } - for (const message of state.bufferedNotifications) { - if (isThreadLifecycleNotification(message) || belongsToTurn(state, message)) { - applyTurnNotification(state, message); - } else { - if (previousHandler) { - previousHandler(message); - } - } - } - state.bufferedNotifications.length = 0; + replayBufferedNotifications(state, previousHandler); if (response.turn?.status && response.turn.status !== "inProgress") { completeTurn(state, response.turn); @@ -604,6 +672,7 @@ async function captureTurn(client, threadId, startRequest, options = {}) { return await state.completion; } finally { clearCompletionTimer(state); + flushPendingThreadLifecycleNotifications(state, previousHandler); client.setNotificationHandler(previousHandler ?? null); } } diff --git a/tests/codex-buffered-subagent.test.mjs b/tests/codex-buffered-subagent.test.mjs new file mode 100644 index 0000000..229ac84 --- /dev/null +++ b/tests/codex-buffered-subagent.test.mjs @@ -0,0 +1,51 @@ +import fs from "node:fs"; +import path from "node:path"; +import process from "node:process"; +import test from "node:test"; +import assert from "node:assert/strict"; + +import { buildEnv, installFakeCodex } from "./fake-codex-fixture.mjs"; +import { initGitRepo, makeTempDir, run } from "./helpers.mjs"; +import { runAppServerTurn } from "../plugins/codex/scripts/lib/codex.mjs"; + +test("runAppServerTurn replays buffered subagent lifecycle without leaking unrelated thread output", async () => { + const repo = makeTempDir(); + const binDir = makeTempDir(); + installFakeCodex(binDir, "with-buffered-subagent"); + initGitRepo(repo); + fs.writeFileSync(path.join(repo, "README.md"), "hello\n"); + run("git", ["add", "README.md"], { cwd: repo }); + run("git", ["commit", "-m", "init"], { cwd: repo }); + + const progress = []; + const env = buildEnv(binDir); + const previousPath = process.env.PATH; + process.env.PATH = env.PATH; + + try { + const result = await runAppServerTurn(repo, { + prompt: "challenge the current design", + onProgress(update) { + progress.push(typeof update === "string" ? { message: update } : update); + } + }); + + assert.equal(result.status, 0); + assert.equal(result.finalMessage, "Handled the requested task.\nTask prompt accepted."); + + const messages = progress.map((update) => update.message ?? "").join("\n"); + const logBodies = progress.map((update) => update.logBody ?? "").join("\n"); + + assert.match(messages, /Starting subagent design-challenger via collaboration tool: wait\./); + assert.match(messages, /Subagent design-challenger reasoning:/); + assert.match(logBodies, /The design assumes retries are harmless/); + assert.doesNotMatch(messages, /off-turn-agent/); + assert.doesNotMatch(logBodies, /Off-turn thread output should stay out of this capture\./); + } finally { + if (previousPath === undefined) { + delete process.env.PATH; + } else { + process.env.PATH = previousPath; + } + } +}); diff --git a/tests/fake-codex-fixture.mjs b/tests/fake-codex-fixture.mjs index ac7f084..204acfe 100644 --- a/tests/fake-codex-fixture.mjs +++ b/tests/fake-codex-fixture.mjs @@ -319,24 +319,75 @@ rl.on("line", (line) => { prompt }; saveState(state); - send({ id: message.id, result: { turn: buildTurn(turnId) } }); const payload = message.params.outputSchema && message.params.outputSchema.properties && message.params.outputSchema.properties.verdict ? structuredReviewPayload(prompt) : taskPayload(prompt, thread.name && thread.name.startsWith("Codex Companion Task") && prompt.includes("Continue from the current thread state")); - - if ( + const usesSubagentFlow = BEHAVIOR === "with-subagent" || BEHAVIOR === "with-late-subagent-message" || - BEHAVIOR === "with-subagent-no-main-turn-completed" - ) { - const subThread = nextThread(state, thread.cwd, true); - const subThreadRecord = ensureThread(state, subThread.id); - subThreadRecord.name = "design-challenger"; + BEHAVIOR === "with-subagent-no-main-turn-completed" || + BEHAVIOR === "with-buffered-subagent"; + const buffersSubagentLifecycle = BEHAVIOR === "with-buffered-subagent"; + + let bufferedSubThreadRecord = null; + + if (buffersSubagentLifecycle) { + const bufferedSubThread = nextThread(state, thread.cwd, true); + bufferedSubThreadRecord = ensureThread(state, bufferedSubThread.id); + bufferedSubThreadRecord.name = "design-challenger"; + + const foreignThread = nextThread(state, thread.cwd, true); + const foreignThreadRecord = ensureThread(state, foreignThread.id); + foreignThreadRecord.name = "off-turn-agent"; saveState(state); + + const foreignTurnId = nextTurnId(state); + + send({ + method: "thread/started", + params: { + thread: { ...buildThread(foreignThreadRecord), name: "off-turn-agent", agentNickname: "off-turn-agent" } + } + }); + send({ + method: "item/completed", + params: { + threadId: foreignThread.id, + turnId: foreignTurnId, + item: { + type: "agentMessage", + id: "msg_" + foreignTurnId, + text: "Off-turn thread output should stay out of this capture.", + phase: "analysis" + } + } + }); + send({ + method: "thread/started", + params: { + thread: { ...buildThread(bufferedSubThreadRecord), name: "design-challenger", agentNickname: "design-challenger" } + } + }); + } + + send({ id: message.id, result: { turn: buildTurn(turnId) } }); + + if (usesSubagentFlow) { + const subThreadRecord = + bufferedSubThreadRecord ?? + (() => { + const subThread = nextThread(state, thread.cwd, true); + const record = ensureThread(state, subThread.id); + record.name = "design-challenger"; + saveState(state); + return record; + })(); const subTurnId = nextTurnId(state); - send({ method: "thread/started", params: { thread: { ...buildThread(subThreadRecord), name: "design-challenger", agentNickname: "design-challenger" } } }); + if (!buffersSubagentLifecycle) { + send({ method: "thread/started", params: { thread: { ...buildThread(subThreadRecord), name: "design-challenger", agentNickname: "design-challenger" } } }); + } send({ method: "turn/started", params: { threadId: thread.id, turn: buildTurn(turnId) } }); send({ method: "item/started", @@ -349,12 +400,12 @@ rl.on("line", (line) => { tool: "wait", status: "inProgress", senderThreadId: thread.id, - receiverThreadIds: [subThread.id], + receiverThreadIds: [subThreadRecord.id], prompt: "Challenge the implementation approach", model: null, reasoningEffort: null, agentsStates: { - [subThread.id]: { status: "inProgress", message: "Investigating design tradeoffs" } + [subThreadRecord.id]: { status: "inProgress", message: "Investigating design tradeoffs" } } } } @@ -369,11 +420,11 @@ rl.on("line", (line) => { } }); } - send({ method: "turn/started", params: { threadId: subThread.id, turn: buildTurn(subTurnId) } }); + send({ method: "turn/started", params: { threadId: subThreadRecord.id, turn: buildTurn(subTurnId) } }); send({ method: "item/completed", params: { - threadId: subThread.id, + threadId: subThreadRecord.id, turnId: subTurnId, item: { type: "reasoning", @@ -386,7 +437,7 @@ rl.on("line", (line) => { send({ method: "item/completed", params: { - threadId: subThread.id, + threadId: subThreadRecord.id, turnId: subTurnId, item: { type: "agentMessage", @@ -396,7 +447,7 @@ rl.on("line", (line) => { } } }); - send({ method: "turn/completed", params: { threadId: subThread.id, turn: buildTurn(subTurnId, "completed") } }); + send({ method: "turn/completed", params: { threadId: subThreadRecord.id, turn: buildTurn(subTurnId, "completed") } }); send({ method: "item/completed", params: { @@ -408,12 +459,12 @@ rl.on("line", (line) => { tool: "wait", status: "completed", senderThreadId: thread.id, - receiverThreadIds: [subThread.id], + receiverThreadIds: [subThreadRecord.id], prompt: "Challenge the implementation approach", model: null, reasoningEffort: null, agentsStates: { - [subThread.id]: { status: "completed", message: "Finished" } + [subThreadRecord.id]: { status: "completed", message: "Finished" } } } }