Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions packages/agent/src/server/agent-server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ describe("AgentServer HTTP Mode", () => {
function stubSessionCleanup(testServer: unknown): {
cleanupSession: (options?: {
completeEventStream?: boolean;
cancelPendingPermissions?: boolean;
}) => Promise<void>;
eventStreamSender: {
enqueue: ReturnType<typeof vi.fn>;
Expand All @@ -459,6 +460,7 @@ describe("AgentServer HTTP Mode", () => {
captureCheckpointState: ReturnType<typeof vi.fn>;
cleanupSession: (options?: {
completeEventStream?: boolean;
cancelPendingPermissions?: boolean;
}) => Promise<void>;
};
cleanupServer.captureCheckpointState = vi.fn(async () => {});
Expand Down Expand Up @@ -494,6 +496,160 @@ describe("AgentServer HTTP Mode", () => {
expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce();
});

// A pending choice box is drained when the session is torn down. On a real
// shutdown that's recorded as a deliberate reject; on a re-init (seamless
// resume) the user never answered, so it must be cancelled — not a reject
// the resumed turn would skip over.
it.each([
{
name: "rejects pending permissions on shutdown cleanup",
options: undefined,
expectedOutcome: { outcome: "selected", optionId: "reject" },
},
{
name: "cancels pending permissions on re-init cleanup",
options: { cancelPendingPermissions: true },
expectedOutcome: { outcome: "cancelled" },
},
])("$name", async ({ options, expectedOutcome }) => {
const testServer = stubSessionCleanup(createServer());
const pending = (
testServer as unknown as {
pendingPermissions: Map<
string,
{
resolve: (r: {
outcome: { outcome: string; optionId?: string };
}) => void;
}
>;
}
).pendingPermissions;
const resolved = new Promise<{
outcome: { outcome: string; optionId?: string };
}>((resolve) => {
pending.set("req-1", { resolve });
});

await testServer.cleanupSession(options);

await expect(resolved).resolves.toMatchObject({
outcome: expectedOutcome,
});
});

it("re-init cleans up an existing session with permissions cancelled, not rejected", async () => {
// This is the practical trigger: a session already exists (with a relayed,
// still-pending choice box) and the server re-initializes — e.g. a seamless
// resume or a reconnect that rebuilds the session. _doInitializeSession must
// route through cleanupSession with cancelPendingPermissions so the
// unanswered question is re-asked on resume rather than recorded as a reject
// and skipped. Stub cleanupSession to capture the call and short-circuit the
// (heavy, unrelated) rest of initialization.
const testServer = createServer() as unknown as {
session: unknown;
cleanupSession: (o?: {
cancelPendingPermissions?: boolean;
}) => Promise<void>;
_doInitializeSession: (payload: JwtPayload, sse: null) => Promise<void>;
};
testServer.session = { payload: { run_id: "old-run" } };
const cleanupSpy = vi
.spyOn(testServer, "cleanupSession")
.mockImplementation(async () => {
// Stop initialization right after the cleanup call we care about.
throw new Error("stop-after-cleanup");
});

const payload: JwtPayload = {
task_id: "test-task-id",
run_id: "new-run",
team_id: 1,
user_id: 1,
distinct_id: "d",
mode: "interactive",
};

await expect(
testServer._doInitializeSession(payload, null),
).rejects.toThrow("stop-after-cleanup");

expect(cleanupSpy).toHaveBeenCalledWith({
cancelPendingPermissions: true,
});

// Restore so afterEach teardown doesn't re-trigger the throwing mock.
cleanupSpy.mockRestore();
testServer.session = null;
});

it("returns a cancelled outcome to the blocked agent when a pending question is re-initialized", async () => {
// The closest we can get to the real bug in a unit test: drive the actual
// ACP requestPermission() entry point rather than poking the internal map.
// The agent raises an AskUserQuestion; with a desktop connected it relays
// and the agent's call *blocks* waiting for the user. A re-init (seamless
// resume) then tears the session down. The promise the agent is awaiting
// must resolve as "cancelled" — so the resumed turn re-asks — not a
// "reject" selection it would treat as the user's answer and skip.
//
// What this still can't cover: what the external agent process does with a
// "cancelled" outcome (re-ask vs. drop) lives in codex-acp, not here — that
// needs an e2e harness driving the real agent.
for (const key of [
"POSTHOG_CODE_INTERACTION_ORIGIN",
"CODE_INTERACTION_ORIGIN",
"TWIG_INTERACTION_ORIGIN",
]) {
delete process.env[key];
}

const testServer = stubSessionCleanup(createServer());
const session = (
testServer as unknown as { session: Record<string, unknown> }
).session;
// A connected desktop so the question relays and the call blocks.
session.sseController = { send: vi.fn(), close: vi.fn() };
session.hasDesktopConnected = true;
session.permissionMode = "default";
(session.logWriter as Record<string, unknown>).appendRawLine = vi.fn();

const payload: JwtPayload = {
task_id: "test-task-id",
run_id: "run-1",
team_id: 1,
user_id: 1,
distinct_id: "d",
mode: "interactive",
};
const client = (
testServer as unknown as {
createCloudClient: (p: JwtPayload) => {
requestPermission: (o: {
options: unknown[];
toolCall: unknown;
}) => Promise<{ outcome: { outcome: string; optionId?: string } }>;
};
}
).createCloudClient(payload);

const permission = client.requestPermission({
options: [{ kind: "allow_once", optionId: "allow", name: "Allow" }],
toolCall: { _meta: { codeToolKind: "question" } },
});
// Relayed and awaiting the user, so the call must not have settled yet.
let settled = false;
void permission.then(() => {
settled = true;
});
await Promise.resolve();
expect(settled).toBe(false);

await testServer.cleanupSession({ cancelPendingPermissions: true });

const result = await permission;
expect(result.outcome.outcome).toBe("cancelled");
});

it("writes terminal failure status before completing event ingest", async () => {
const order: string[] = [];
const testServer = new AgentServer({
Expand Down
38 changes: 25 additions & 13 deletions packages/agent/src/server/agent-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,7 @@ export class AgentServer {
private pendingPermissions = new Map<
string,
{
resolve: (response: {
outcome: { outcome: "selected"; optionId: string };
_meta?: Record<string, unknown>;
}) => void;
resolve: (response: RequestPermissionResponse) => void;
toolCallId?: string;
}
>();
Expand Down Expand Up @@ -1035,7 +1032,9 @@ export class AgentServer {
sseController: SseController | null,
): Promise<void> {
if (this.session) {
await this.cleanupSession();
// Re-init, not shutdown: cancel (don't reject) any pending choice box so
// the user's unanswered question is re-asked after resume, not skipped.
await this.cleanupSession({ cancelPendingPermissions: true });
}

this.resumeState = null;
Expand Down Expand Up @@ -3148,8 +3147,10 @@ ${signedCommitInstructions}

private async cleanupSession({
completeEventStream = false,
cancelPendingPermissions = false,
}: {
completeEventStream?: boolean;
cancelPendingPermissions?: boolean;
} = {}): Promise<void> {
if (!this.session) return;

Expand All @@ -3171,11 +3172,25 @@ ${signedCommitInstructions}

// Drain pending permissions before ACP cleanup to avoid deadlocks —
// cleanup may await operations that are blocked on a permission response.
//
// On re-init (cancelPendingPermissions) resolve them as "cancelled" rather
// than a "reject" selection. The user never answered — the session is just
// being rebuilt underneath them (e.g. a seamless resume). Resolving with a
// "reject" selection records it as the user's deliberate choice, which the
// agent persists and treats as answered on resume, silently skipping the
// choice box. "cancelled" leaves it unanswered so the resumed turn re-asks.
for (const [, pending] of this.pendingPermissions) {
pending.resolve({
outcome: { outcome: "selected", optionId: "reject" },
_meta: { customInput: "Session is shutting down." },
});
pending.resolve(
cancelPendingPermissions
? {
outcome: { outcome: "cancelled" },
_meta: { message: "Session is resuming." },
}
: {
outcome: { outcome: "selected", optionId: "reject" },
_meta: { customInput: "Session is shutting down." },
},
);
}
this.pendingPermissions.clear();

Expand Down Expand Up @@ -3320,10 +3335,7 @@ ${signedCommitInstructions}
private relayPermissionToClient(params: {
options: Array<{ kind: string; optionId: string; name?: string }>;
toolCall?: Record<string, unknown> | null;
}): Promise<{
outcome: { outcome: "selected"; optionId: string };
_meta?: Record<string, unknown>;
}> {
}): Promise<RequestPermissionResponse> {
const requestId = crypto.randomUUID();
const toolCallId = params.toolCall?.toolCallId as string | undefined;

Expand Down
Loading