From d10e14c33215f76df80a403cdf639ced4076c578 Mon Sep 17 00:00:00 2001 From: Matt Pua Date: Mon, 29 Jun 2026 16:12:03 -0400 Subject: [PATCH 1/2] fix(cloud-tasks): re-ask pending question on resume instead of skipping it MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a session is re-initialized while a permission request is still pending (e.g. a seamless resume or a reconnect that rebuilds the session), `cleanupSession` drained the pending request by resolving it as a deliberate `{ outcome: "selected", optionId: "reject" }`. The agent persisted that as the user's choice, so on resume the question was treated as answered and the choice box was silently skipped — even though the user was watching and never replied. Resolve pending permissions as `{ outcome: "cancelled" }` on re-init instead. The user never answered; the session is just being rebuilt underneath them, so "cancelled" leaves the request unanswered and the resumed turn re-asks it. Real shutdown/close paths are unchanged and still record a reject. Also collapse two hand-rolled permission-response unions onto the SDK's `RequestPermissionResponse` type (single source of truth). Tests: shutdown cleanup still rejects; re-init cleanup cancels; and `_doInitializeSession` (the practical trigger) routes through cleanup with the cancel flag set. Generated-By: PostHog Code Task-Id: 14da7a0f-aa44-4b0d-89b3-52e58f66b1e8 --- .../agent/src/server/agent-server.test.ts | 108 ++++++++++++++++++ packages/agent/src/server/agent-server.ts | 38 +++--- 2 files changed, 133 insertions(+), 13 deletions(-) diff --git a/packages/agent/src/server/agent-server.test.ts b/packages/agent/src/server/agent-server.test.ts index 0df0d013d0..0b5be39128 100644 --- a/packages/agent/src/server/agent-server.test.ts +++ b/packages/agent/src/server/agent-server.test.ts @@ -494,6 +494,114 @@ describe("AgentServer HTTP Mode", () => { expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce(); }); + it("rejects pending permissions on shutdown cleanup", async () => { + const testServer = stubSessionCleanup(createServer()); + const pending = ( + testServer as unknown as { + pendingPermissions: Map< + string, + { + resolve: (r: { + outcome: { outcome: string; optionId?: string }; + }) => void; + } + >; + } + ).pendingPermissions; + const resolved = new Promise<{ + outcome: { outcome: string; optionId?: string }; + }>((resolve) => { + pending.set("req-1", { resolve }); + }); + + await testServer.cleanupSession(); + + // Shutdown: the unanswered choice box is recorded as a deliberate reject. + await expect(resolved).resolves.toMatchObject({ + outcome: { outcome: "selected", optionId: "reject" }, + }); + }); + + it("cancels (does not reject) pending permissions on re-init cleanup", async () => { + const testServer = stubSessionCleanup(createServer()); + const pending = ( + testServer as unknown as { + pendingPermissions: Map< + string, + { + resolve: (r: { + outcome: { outcome: string; optionId?: string }; + }) => void; + } + >; + } + ).pendingPermissions; + const resolved = new Promise<{ + outcome: { outcome: string; optionId?: string }; + }>((resolve) => { + pending.set("req-1", { resolve }); + }); + + await ( + testServer as unknown as { + cleanupSession: (o: { + cancelPendingPermissions?: boolean; + }) => Promise; + } + ).cleanupSession({ cancelPendingPermissions: true }); + + // Re-init (e.g. seamless resume): the user never answered, so the request + // must be cancelled — not recorded as a reject the resumed turn skips over. + await expect(resolved).resolves.toMatchObject({ + outcome: { outcome: "cancelled" }, + }); + }); + + it("re-init cleans up an existing session with permissions cancelled, not rejected", async () => { + // This is the practical trigger: a session already exists (with a relayed, + // still-pending choice box) and the server re-initializes — e.g. a seamless + // resume or a reconnect that rebuilds the session. _doInitializeSession must + // route through cleanupSession with cancelPendingPermissions so the + // unanswered question is re-asked on resume rather than recorded as a reject + // and skipped. Stub cleanupSession to capture the call and short-circuit the + // (heavy, unrelated) rest of initialization. + const testServer = createServer() as unknown as { + session: unknown; + cleanupSession: (o?: { + cancelPendingPermissions?: boolean; + }) => Promise; + _doInitializeSession: (payload: JwtPayload, sse: null) => Promise; + }; + testServer.session = { payload: { run_id: "old-run" } }; + const cleanupSpy = vi + .spyOn(testServer, "cleanupSession") + .mockImplementation(async () => { + // Stop initialization right after the cleanup call we care about. + throw new Error("stop-after-cleanup"); + }); + + const payload: JwtPayload = { + task_id: "test-task-id", + run_id: "new-run", + team_id: 1, + user_id: 1, + distinct_id: "d", + mode: "interactive", + }; + + await expect( + testServer._doInitializeSession(payload, null), + ).rejects.toThrow("stop-after-cleanup"); + + expect(cleanupSpy).toHaveBeenCalledWith({ + cancelPendingPermissions: true, + }); + + // Restore so afterEach teardown doesn't re-trigger the throwing mock. + cleanupSpy.mockRestore(); + testServer.session = null; + }); + it("writes terminal failure status before completing event ingest", async () => { const order: string[] = []; const testServer = new AgentServer({ diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 1d5268cb81..ed182a9b10 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -300,10 +300,7 @@ export class AgentServer { private pendingPermissions = new Map< string, { - resolve: (response: { - outcome: { outcome: "selected"; optionId: string }; - _meta?: Record; - }) => void; + resolve: (response: RequestPermissionResponse) => void; toolCallId?: string; } >(); @@ -1035,7 +1032,9 @@ export class AgentServer { sseController: SseController | null, ): Promise { if (this.session) { - await this.cleanupSession(); + // Re-init, not shutdown: cancel (don't reject) any pending choice box so + // the user's unanswered question is re-asked after resume, not skipped. + await this.cleanupSession({ cancelPendingPermissions: true }); } this.resumeState = null; @@ -3148,8 +3147,10 @@ ${signedCommitInstructions} private async cleanupSession({ completeEventStream = false, + cancelPendingPermissions = false, }: { completeEventStream?: boolean; + cancelPendingPermissions?: boolean; } = {}): Promise { if (!this.session) return; @@ -3171,11 +3172,25 @@ ${signedCommitInstructions} // Drain pending permissions before ACP cleanup to avoid deadlocks — // cleanup may await operations that are blocked on a permission response. + // + // On re-init (cancelPendingPermissions) resolve them as "cancelled" rather + // than a "reject" selection. The user never answered — the session is just + // being rebuilt underneath them (e.g. a seamless resume). Resolving with a + // "reject" selection records it as the user's deliberate choice, which the + // agent persists and treats as answered on resume, silently skipping the + // choice box. "cancelled" leaves it unanswered so the resumed turn re-asks. for (const [, pending] of this.pendingPermissions) { - pending.resolve({ - outcome: { outcome: "selected", optionId: "reject" }, - _meta: { customInput: "Session is shutting down." }, - }); + pending.resolve( + cancelPendingPermissions + ? { + outcome: { outcome: "cancelled" }, + _meta: { message: "Session is resuming." }, + } + : { + outcome: { outcome: "selected", optionId: "reject" }, + _meta: { customInput: "Session is shutting down." }, + }, + ); } this.pendingPermissions.clear(); @@ -3320,10 +3335,7 @@ ${signedCommitInstructions} private relayPermissionToClient(params: { options: Array<{ kind: string; optionId: string; name?: string }>; toolCall?: Record | null; - }): Promise<{ - outcome: { outcome: "selected"; optionId: string }; - _meta?: Record; - }> { + }): Promise { const requestId = crypto.randomUUID(); const toolCallId = params.toolCall?.toolCallId as string | undefined; From 5e13ec1efb1a9f9dc5536f2957ad521b4b48c33e Mon Sep 17 00:00:00 2001 From: Matt Pua Date: Mon, 29 Jun 2026 16:46:12 -0400 Subject: [PATCH 2/2] test(cloud-tasks): strengthen pending-question resume coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review feedback and close the coverage gap on the re-init cancel path: - Parameterize the two twin drain tests (shutdown rejects / re-init cancels) into a single it.each, per the repo's preference for parameterized tests. - Add cancelPendingPermissions to stubSessionCleanup's return type so tests call cleanupSession({ cancelPendingPermissions: true }) without re-casting. - Add an integration test that drives the real ACP requestPermission() entry point: the agent's question call blocks while relayed, a re-init tears the session down, and the promise the agent is awaiting resolves as "cancelled" (not a reject the resumed turn would skip). Documents the one piece a unit test can't reach — whether the external agent re-asks — which needs e2e. - Drop inline comments that just restated the integration test's header. Generated-By: PostHog Code Task-Id: 14da7a0f-aa44-4b0d-89b3-52e58f66b1e8 --- .../agent/src/server/agent-server.test.ts | 126 ++++++++++++------ 1 file changed, 87 insertions(+), 39 deletions(-) diff --git a/packages/agent/src/server/agent-server.test.ts b/packages/agent/src/server/agent-server.test.ts index 0b5be39128..e72761e616 100644 --- a/packages/agent/src/server/agent-server.test.ts +++ b/packages/agent/src/server/agent-server.test.ts @@ -444,6 +444,7 @@ describe("AgentServer HTTP Mode", () => { function stubSessionCleanup(testServer: unknown): { cleanupSession: (options?: { completeEventStream?: boolean; + cancelPendingPermissions?: boolean; }) => Promise; eventStreamSender: { enqueue: ReturnType; @@ -459,6 +460,7 @@ describe("AgentServer HTTP Mode", () => { captureCheckpointState: ReturnType; cleanupSession: (options?: { completeEventStream?: boolean; + cancelPendingPermissions?: boolean; }) => Promise; }; cleanupServer.captureCheckpointState = vi.fn(async () => {}); @@ -494,35 +496,22 @@ describe("AgentServer HTTP Mode", () => { expect(testServer.eventStreamSender.stop).toHaveBeenCalledOnce(); }); - it("rejects pending permissions on shutdown cleanup", async () => { - const testServer = stubSessionCleanup(createServer()); - const pending = ( - testServer as unknown as { - pendingPermissions: Map< - string, - { - resolve: (r: { - outcome: { outcome: string; optionId?: string }; - }) => void; - } - >; - } - ).pendingPermissions; - const resolved = new Promise<{ - outcome: { outcome: string; optionId?: string }; - }>((resolve) => { - pending.set("req-1", { resolve }); - }); - - await testServer.cleanupSession(); - - // Shutdown: the unanswered choice box is recorded as a deliberate reject. - await expect(resolved).resolves.toMatchObject({ - outcome: { outcome: "selected", optionId: "reject" }, - }); - }); - - it("cancels (does not reject) pending permissions on re-init cleanup", async () => { + // A pending choice box is drained when the session is torn down. On a real + // shutdown that's recorded as a deliberate reject; on a re-init (seamless + // resume) the user never answered, so it must be cancelled — not a reject + // the resumed turn would skip over. + it.each([ + { + name: "rejects pending permissions on shutdown cleanup", + options: undefined, + expectedOutcome: { outcome: "selected", optionId: "reject" }, + }, + { + name: "cancels pending permissions on re-init cleanup", + options: { cancelPendingPermissions: true }, + expectedOutcome: { outcome: "cancelled" }, + }, + ])("$name", async ({ options, expectedOutcome }) => { const testServer = stubSessionCleanup(createServer()); const pending = ( testServer as unknown as { @@ -542,18 +531,10 @@ describe("AgentServer HTTP Mode", () => { pending.set("req-1", { resolve }); }); - await ( - testServer as unknown as { - cleanupSession: (o: { - cancelPendingPermissions?: boolean; - }) => Promise; - } - ).cleanupSession({ cancelPendingPermissions: true }); + await testServer.cleanupSession(options); - // Re-init (e.g. seamless resume): the user never answered, so the request - // must be cancelled — not recorded as a reject the resumed turn skips over. await expect(resolved).resolves.toMatchObject({ - outcome: { outcome: "cancelled" }, + outcome: expectedOutcome, }); }); @@ -602,6 +583,73 @@ describe("AgentServer HTTP Mode", () => { testServer.session = null; }); + it("returns a cancelled outcome to the blocked agent when a pending question is re-initialized", async () => { + // The closest we can get to the real bug in a unit test: drive the actual + // ACP requestPermission() entry point rather than poking the internal map. + // The agent raises an AskUserQuestion; with a desktop connected it relays + // and the agent's call *blocks* waiting for the user. A re-init (seamless + // resume) then tears the session down. The promise the agent is awaiting + // must resolve as "cancelled" — so the resumed turn re-asks — not a + // "reject" selection it would treat as the user's answer and skip. + // + // What this still can't cover: what the external agent process does with a + // "cancelled" outcome (re-ask vs. drop) lives in codex-acp, not here — that + // needs an e2e harness driving the real agent. + for (const key of [ + "POSTHOG_CODE_INTERACTION_ORIGIN", + "CODE_INTERACTION_ORIGIN", + "TWIG_INTERACTION_ORIGIN", + ]) { + delete process.env[key]; + } + + const testServer = stubSessionCleanup(createServer()); + const session = ( + testServer as unknown as { session: Record } + ).session; + // A connected desktop so the question relays and the call blocks. + session.sseController = { send: vi.fn(), close: vi.fn() }; + session.hasDesktopConnected = true; + session.permissionMode = "default"; + (session.logWriter as Record).appendRawLine = vi.fn(); + + const payload: JwtPayload = { + task_id: "test-task-id", + run_id: "run-1", + team_id: 1, + user_id: 1, + distinct_id: "d", + mode: "interactive", + }; + const client = ( + testServer as unknown as { + createCloudClient: (p: JwtPayload) => { + requestPermission: (o: { + options: unknown[]; + toolCall: unknown; + }) => Promise<{ outcome: { outcome: string; optionId?: string } }>; + }; + } + ).createCloudClient(payload); + + const permission = client.requestPermission({ + options: [{ kind: "allow_once", optionId: "allow", name: "Allow" }], + toolCall: { _meta: { codeToolKind: "question" } }, + }); + // Relayed and awaiting the user, so the call must not have settled yet. + let settled = false; + void permission.then(() => { + settled = true; + }); + await Promise.resolve(); + expect(settled).toBe(false); + + await testServer.cleanupSession({ cancelPendingPermissions: true }); + + const result = await permission; + expect(result.outcome.outcome).toBe("cancelled"); + }); + it("writes terminal failure status before completing event ingest", async () => { const order: string[] = []; const testServer = new AgentServer({