Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 89 additions & 16 deletions plugins/codex/scripts/lib/codex.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* threadLabels: Map<string, string>,
* turnId: string | null,
* bufferedNotifications: AppServerNotification[],
* pendingThreadLifecycleNotifications: AppServerNotification[],
* drainingThreadLifecycleNotifications: boolean,
* completion: Promise<TurnCaptureState>,
* resolveCompletion: (state: TurnCaptureState) => void,
* rejectCompletion: (error: unknown) => void,
Expand Down Expand Up @@ -230,6 +232,8 @@ function registerThread(state, threadId, options = {}) {
if (label) {
state.threadLabels.set(threadId, label);
}

drainPendingThreadLifecycleNotifications(state);
}

function describeStartedItem(state, item) {
Expand Down Expand Up @@ -310,6 +314,8 @@ function createTurnCaptureState(threadId, options = {}) {
threadLabels: new Map(),
turnId: null,
bufferedNotifications: [],
pendingThreadLifecycleNotifications: [],
drainingThreadLifecycleNotifications: false,
completion,
resolveCompletion,
rejectCompletion,
Expand Down Expand Up @@ -550,6 +556,81 @@ function applyTurnNotification(state, message) {
}
}

function isThreadLifecycleNotification(message) {
return message.method === "thread/started" || message.method === "thread/name/updated";
}

function extractLifecycleThreadId(message) {
if (message.method === "thread/started") {
return message.params?.thread?.id ?? null;
}
if (message.method === "thread/name/updated") {
return message.params?.threadId ?? null;
}
return null;
}

function canApplyThreadLifecycleNotification(state, message) {
const threadId = extractLifecycleThreadId(message);
return Boolean(threadId && state.threadIds.has(threadId));
}

function drainPendingThreadLifecycleNotifications(state) {
if (state.drainingThreadLifecycleNotifications || state.pendingThreadLifecycleNotifications.length === 0) {
return;
}

state.drainingThreadLifecycleNotifications = true;
try {
const remaining = [];
for (const message of state.pendingThreadLifecycleNotifications) {
if (canApplyThreadLifecycleNotification(state, message)) {
applyTurnNotification(state, message);
} else {
remaining.push(message);
}
}
state.pendingThreadLifecycleNotifications = remaining;
} finally {
state.drainingThreadLifecycleNotifications = false;
}
}

function handleThreadLifecycleNotification(state, message) {
if (canApplyThreadLifecycleNotification(state, message)) {
applyTurnNotification(state, message);
return;
}

state.pendingThreadLifecycleNotifications.push(message);
}

function flushPendingThreadLifecycleNotifications(state, previousHandler) {
if (state.pendingThreadLifecycleNotifications.length === 0) {
return;
}

for (const message of state.pendingThreadLifecycleNotifications) {
if (previousHandler) {
previousHandler(message);
}
}
state.pendingThreadLifecycleNotifications.length = 0;
}

function replayBufferedNotifications(state, previousHandler) {
for (const message of state.bufferedNotifications) {
if (isThreadLifecycleNotification(message)) {
handleThreadLifecycleNotification(state, message);
} else if (belongsToTurn(state, message)) {
applyTurnNotification(state, message);
} else if (previousHandler) {
previousHandler(message);
}
}
state.bufferedNotifications.length = 0;
}

async function captureTurn(client, threadId, startRequest, options = {}) {
const state = createTurnCaptureState(threadId, options);
const previousHandler = client.notificationHandler;
Expand All @@ -560,16 +641,16 @@ async function captureTurn(client, threadId, startRequest, options = {}) {
return;
}

if (message.method === "thread/started" || message.method === "thread/name/updated") {
applyTurnNotification(state, message);
if (isThreadLifecycleNotification(message)) {
handleThreadLifecycleNotification(state, message);
return;
}

if (!belongsToTurn(state, message)) {
if (previousHandler) {
previousHandler(message);
}
return;
if (previousHandler) {
previousHandler(message);
}
return;
}

applyTurnNotification(state, message);
Expand All @@ -582,16 +663,7 @@ async function captureTurn(client, threadId, startRequest, options = {}) {
if (state.turnId) {
state.threadTurnIds.set(state.threadId, state.turnId);
}
for (const message of state.bufferedNotifications) {
if (belongsToTurn(state, message)) {
applyTurnNotification(state, message);
} else {
if (previousHandler) {
previousHandler(message);
}
}
}
state.bufferedNotifications.length = 0;
replayBufferedNotifications(state, previousHandler);

if (response.turn?.status && response.turn.status !== "inProgress") {
completeTurn(state, response.turn);
Expand All @@ -600,6 +672,7 @@ async function captureTurn(client, threadId, startRequest, options = {}) {
return await state.completion;
} finally {
clearCompletionTimer(state);
flushPendingThreadLifecycleNotifications(state, previousHandler);
client.setNotificationHandler(previousHandler ?? null);
}
}
Expand Down
51 changes: 51 additions & 0 deletions tests/codex-buffered-subagent.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import fs from "node:fs";
import path from "node:path";
import process from "node:process";
import test from "node:test";
import assert from "node:assert/strict";

import { buildEnv, installFakeCodex } from "./fake-codex-fixture.mjs";
import { initGitRepo, makeTempDir, run } from "./helpers.mjs";
import { runAppServerTurn } from "../plugins/codex/scripts/lib/codex.mjs";

test("runAppServerTurn replays buffered subagent lifecycle without leaking unrelated thread output", async () => {
const repo = makeTempDir();
const binDir = makeTempDir();
installFakeCodex(binDir, "with-buffered-subagent");
initGitRepo(repo);
fs.writeFileSync(path.join(repo, "README.md"), "hello\n");
run("git", ["add", "README.md"], { cwd: repo });
run("git", ["commit", "-m", "init"], { cwd: repo });

const progress = [];
const env = buildEnv(binDir);
const previousPath = process.env.PATH;
process.env.PATH = env.PATH;

try {
const result = await runAppServerTurn(repo, {
prompt: "challenge the current design",
onProgress(update) {
progress.push(typeof update === "string" ? { message: update } : update);
}
});

assert.equal(result.status, 0);
assert.equal(result.finalMessage, "Handled the requested task.\nTask prompt accepted.");

const messages = progress.map((update) => update.message ?? "").join("\n");
const logBodies = progress.map((update) => update.logBody ?? "").join("\n");

assert.match(messages, /Starting subagent design-challenger via collaboration tool: wait\./);
assert.match(messages, /Subagent design-challenger reasoning:/);
assert.match(logBodies, /The design assumes retries are harmless/);
assert.doesNotMatch(messages, /off-turn-agent/);
assert.doesNotMatch(logBodies, /Off-turn thread output should stay out of this capture\./);
} finally {
if (previousPath === undefined) {
delete process.env.PATH;
} else {
process.env.PATH = previousPath;
}
}
});
85 changes: 68 additions & 17 deletions tests/fake-codex-fixture.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -319,24 +319,75 @@ rl.on("line", (line) => {
prompt
};
saveState(state);
send({ id: message.id, result: { turn: buildTurn(turnId) } });

const payload = message.params.outputSchema && message.params.outputSchema.properties && message.params.outputSchema.properties.verdict
? structuredReviewPayload(prompt)
: taskPayload(prompt, thread.name && thread.name.startsWith("Codex Companion Task") && prompt.includes("Continue from the current thread state"));

if (
const usesSubagentFlow =
BEHAVIOR === "with-subagent" ||
BEHAVIOR === "with-late-subagent-message" ||
BEHAVIOR === "with-subagent-no-main-turn-completed"
) {
const subThread = nextThread(state, thread.cwd, true);
const subThreadRecord = ensureThread(state, subThread.id);
subThreadRecord.name = "design-challenger";
BEHAVIOR === "with-subagent-no-main-turn-completed" ||
BEHAVIOR === "with-buffered-subagent";
const buffersSubagentLifecycle = BEHAVIOR === "with-buffered-subagent";

let bufferedSubThreadRecord = null;

if (buffersSubagentLifecycle) {
const bufferedSubThread = nextThread(state, thread.cwd, true);
bufferedSubThreadRecord = ensureThread(state, bufferedSubThread.id);
bufferedSubThreadRecord.name = "design-challenger";

const foreignThread = nextThread(state, thread.cwd, true);
const foreignThreadRecord = ensureThread(state, foreignThread.id);
foreignThreadRecord.name = "off-turn-agent";
saveState(state);

const foreignTurnId = nextTurnId(state);

send({
method: "thread/started",
params: {
thread: { ...buildThread(foreignThreadRecord), name: "off-turn-agent", agentNickname: "off-turn-agent" }
}
});
send({
method: "item/completed",
params: {
threadId: foreignThread.id,
turnId: foreignTurnId,
item: {
type: "agentMessage",
id: "msg_" + foreignTurnId,
text: "Off-turn thread output should stay out of this capture.",
phase: "analysis"
}
}
});
send({
method: "thread/started",
params: {
thread: { ...buildThread(bufferedSubThreadRecord), name: "design-challenger", agentNickname: "design-challenger" }
}
});
}

send({ id: message.id, result: { turn: buildTurn(turnId) } });

if (usesSubagentFlow) {
const subThreadRecord =
bufferedSubThreadRecord ??
(() => {
const subThread = nextThread(state, thread.cwd, true);
const record = ensureThread(state, subThread.id);
record.name = "design-challenger";
saveState(state);
return record;
})();
const subTurnId = nextTurnId(state);

send({ method: "thread/started", params: { thread: { ...buildThread(subThreadRecord), name: "design-challenger", agentNickname: "design-challenger" } } });
if (!buffersSubagentLifecycle) {
send({ method: "thread/started", params: { thread: { ...buildThread(subThreadRecord), name: "design-challenger", agentNickname: "design-challenger" } } });
}
send({ method: "turn/started", params: { threadId: thread.id, turn: buildTurn(turnId) } });
send({
method: "item/started",
Expand All @@ -349,12 +400,12 @@ rl.on("line", (line) => {
tool: "wait",
status: "inProgress",
senderThreadId: thread.id,
receiverThreadIds: [subThread.id],
receiverThreadIds: [subThreadRecord.id],
prompt: "Challenge the implementation approach",
model: null,
reasoningEffort: null,
agentsStates: {
[subThread.id]: { status: "inProgress", message: "Investigating design tradeoffs" }
[subThreadRecord.id]: { status: "inProgress", message: "Investigating design tradeoffs" }
}
}
}
Expand All @@ -369,11 +420,11 @@ rl.on("line", (line) => {
}
});
}
send({ method: "turn/started", params: { threadId: subThread.id, turn: buildTurn(subTurnId) } });
send({ method: "turn/started", params: { threadId: subThreadRecord.id, turn: buildTurn(subTurnId) } });
send({
method: "item/completed",
params: {
threadId: subThread.id,
threadId: subThreadRecord.id,
turnId: subTurnId,
item: {
type: "reasoning",
Expand All @@ -386,7 +437,7 @@ rl.on("line", (line) => {
send({
method: "item/completed",
params: {
threadId: subThread.id,
threadId: subThreadRecord.id,
turnId: subTurnId,
item: {
type: "agentMessage",
Expand All @@ -396,7 +447,7 @@ rl.on("line", (line) => {
}
}
});
send({ method: "turn/completed", params: { threadId: subThread.id, turn: buildTurn(subTurnId, "completed") } });
send({ method: "turn/completed", params: { threadId: subThreadRecord.id, turn: buildTurn(subTurnId, "completed") } });
send({
method: "item/completed",
params: {
Expand All @@ -408,12 +459,12 @@ rl.on("line", (line) => {
tool: "wait",
status: "completed",
senderThreadId: thread.id,
receiverThreadIds: [subThread.id],
receiverThreadIds: [subThreadRecord.id],
prompt: "Challenge the implementation approach",
model: null,
reasoningEffort: null,
agentsStates: {
[subThread.id]: { status: "completed", message: "Finished" }
[subThreadRecord.id]: { status: "completed", message: "Finished" }
}
}
}
Expand Down