diff --git a/packages/server/src/server/agent/agent-prompt.ts b/packages/server/src/server/agent/agent-prompt.ts index 5135c77e19..35b269158f 100644 --- a/packages/server/src/server/agent/agent-prompt.ts +++ b/packages/server/src/server/agent/agent-prompt.ts @@ -5,13 +5,18 @@ import type { AgentManager } from "./agent-manager.js"; import type { AgentStorage } from "./agent-storage.js"; import { ensureAgentLoaded } from "./agent-loading.js"; +export type AgentRunController = Pick< + AgentManager, + "tryRunOutOfBand" | "hasInFlightRun" | "replaceAgentRun" | "streamAgent" +>; + export interface StartAgentRunOptions { replaceRunning?: boolean; runOptions?: AgentRunOptions; } export function startAgentRun( - agentManager: AgentManager, + agentManager: AgentRunController, agentId: string, prompt: AgentPromptInput, logger: Logger, diff --git a/packages/server/src/server/agent/mcp-server.ts b/packages/server/src/server/agent/mcp-server.ts index 0f8f91bb79..2d221a34f5 100644 --- a/packages/server/src/server/agent/mcp-server.ts +++ b/packages/server/src/server/agent/mcp-server.ts @@ -65,6 +65,7 @@ import { waitForAgentWithTimeout, } from "./mcp-shared.js"; import { sendPromptToAgent, setupFinishNotification, startAgentRun } from "./agent-prompt.js"; +import { respondToAgentPermission } from "./permission-response.js"; import type { GitHubService } from "../../services/github-service.js"; import type { WorkspaceGitService } from "../workspace-git-service.js"; import type { CreatePaseoWorktreeInput } from "../paseo-worktree-service.js"; @@ -2086,7 +2087,13 @@ export async function createAgentMcpServer(options: AgentMcpServerOptions): Prom }, }, async ({ agentId, requestId, response }) => { - await agentManager.respondToPermission(agentId, requestId, response); + await respondToAgentPermission({ + agentManager, + agentId, + requestId, + response, + logger: childLogger, + }); return { content: [], structuredContent: ensureValidJson({ success: true }), diff --git a/packages/server/src/server/agent/permission-response.test.ts b/packages/server/src/server/agent/permission-response.test.ts new file mode 100644 index 0000000000..380f2f6dbe --- /dev/null +++ b/packages/server/src/server/agent/permission-response.test.ts @@ -0,0 +1,138 @@ +import { describe, expect, test } from "vitest"; + +import { createTestLogger } from "../../test-utils/test-logger.js"; +import type { + AgentPromptInput, + AgentPermissionResult, + AgentRunOptions, + AgentPermissionResponse, +} from "./agent-sdk-types.js"; +import type { AgentStreamEvent } from "../messages.js"; +import { respondToAgentPermission } from "./permission-response.js"; + +class FakePermissionAgentManager { + permissionResult: AgentPermissionResult | void; + hasRunInFlight = false; + outOfBandHandled = false; + permissionResponses: Array<{ + agentId: string; + requestId: string; + response: AgentPermissionResponse; + }> = []; + streamRuns: Array<{ agentId: string; prompt: AgentPromptInput; options?: AgentRunOptions }> = []; + replacementRuns: Array<{ agentId: string; prompt: AgentPromptInput; options?: AgentRunOptions }> = + []; + + async respondToPermission( + agentId: string, + requestId: string, + response: AgentPermissionResponse, + ): Promise { + this.permissionResponses.push({ agentId, requestId, response }); + return this.permissionResult; + } + + tryRunOutOfBand(): boolean { + return this.outOfBandHandled; + } + + hasInFlightRun(): boolean { + return this.hasRunInFlight; + } + + streamAgent( + agentId: string, + prompt: AgentPromptInput, + options?: AgentRunOptions, + ): AsyncGenerator { + this.streamRuns.push({ agentId, prompt, options }); + return emptyAgentStream(); + } + + replaceAgentRun( + agentId: string, + prompt: AgentPromptInput, + options?: AgentRunOptions, + ): AsyncGenerator { + this.replacementRuns.push({ agentId, prompt, options }); + return emptyAgentStream(); + } +} + +async function* emptyAgentStream(): AsyncGenerator {} + +describe("respondToAgentPermission", () => { + const logger = createTestLogger(); + + test("starts a follow-up run returned by the provider permission response", async () => { + const agentManager = new FakePermissionAgentManager(); + agentManager.permissionResult = { followUpPrompt: "implement the approved plan" }; + + await respondToAgentPermission({ + agentManager, + agentId: "agent-1", + requestId: "permission-1", + response: { behavior: "allow" }, + logger, + }); + + expect(agentManager.permissionResponses).toEqual([ + { + agentId: "agent-1", + requestId: "permission-1", + response: { behavior: "allow" }, + }, + ]); + expect(agentManager.streamRuns).toEqual([ + { + agentId: "agent-1", + prompt: "implement the approved plan", + }, + ]); + expect(agentManager.replacementRuns).toEqual([]); + }); + + test("does not start a run when the permission response has no follow-up prompt", async () => { + const agentManager = new FakePermissionAgentManager(); + + await respondToAgentPermission({ + agentManager, + agentId: "agent-1", + requestId: "permission-1", + response: { behavior: "deny", message: "not now" }, + logger, + }); + + expect(agentManager.permissionResponses).toEqual([ + { + agentId: "agent-1", + requestId: "permission-1", + response: { behavior: "deny", message: "not now" }, + }, + ]); + expect(agentManager.streamRuns).toEqual([]); + expect(agentManager.replacementRuns).toEqual([]); + }); + + test("replaces an in-flight run for follow-up prompts", async () => { + const agentManager = new FakePermissionAgentManager(); + agentManager.hasRunInFlight = true; + agentManager.permissionResult = { followUpPrompt: "continue after approval" }; + + await respondToAgentPermission({ + agentManager, + agentId: "agent-1", + requestId: "permission-1", + response: { behavior: "allow" }, + logger, + }); + + expect(agentManager.streamRuns).toEqual([]); + expect(agentManager.replacementRuns).toEqual([ + { + agentId: "agent-1", + prompt: "continue after approval", + }, + ]); + }); +}); diff --git a/packages/server/src/server/agent/permission-response.ts b/packages/server/src/server/agent/permission-response.ts new file mode 100644 index 0000000000..5d473e1287 --- /dev/null +++ b/packages/server/src/server/agent/permission-response.ts @@ -0,0 +1,40 @@ +import type { Logger } from "pino"; + +import type { AgentPermissionResponse, AgentPermissionResult } from "./agent-sdk-types.js"; +import { startAgentRun, type AgentRunController } from "./agent-prompt.js"; + +export interface PermissionResponseAgentManager extends AgentRunController { + respondToPermission( + agentId: string, + requestId: string, + response: AgentPermissionResponse, + ): Promise; +} + +export interface RespondToAgentPermissionParams { + agentManager: PermissionResponseAgentManager; + agentId: string; + requestId: string; + response: AgentPermissionResponse; + logger: Logger; +} + +export async function respondToAgentPermission( + params: RespondToAgentPermissionParams, +): Promise { + const { agentManager, agentId, requestId, response, logger } = params; + logger.debug( + { agentId, requestId }, + `Handling permission response for agent ${agentId}, request ${requestId}`, + ); + + const result = await agentManager.respondToPermission(agentId, requestId, response); + logger.debug({ agentId }, `Permission response forwarded to agent ${agentId}`); + + if (result?.followUpPrompt) { + logger.debug({ agentId }, "Permission response requires follow-up turn, starting agent stream"); + startAgentRun(agentManager, agentId, result.followUpPrompt, logger, { + replaceRunning: true, + }); + } +} diff --git a/packages/server/src/server/session.ts b/packages/server/src/server/session.ts index ab4b1cd002..5143e5951d 100644 --- a/packages/server/src/server/session.ts +++ b/packages/server/src/server/session.ts @@ -67,6 +67,7 @@ import { sendPromptToAgent, unarchiveAgentState, } from "./agent/agent-prompt.js"; +import { respondToAgentPermission } from "./agent/permission-response.js"; import { experimental_createMCPClient } from "ai"; import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"; import type { VoiceCallerContext, VoiceSpeakHandler } from "./voice-types.js"; @@ -127,7 +128,6 @@ import type { AgentPromptInput, AgentRunOptions, AgentSessionConfig, - AgentStreamEvent, ProviderSnapshotEntry, } from "./agent/agent-sdk-types.js"; import type { StoredAgentRecord } from "./agent/agent-storage.js"; @@ -1056,52 +1056,6 @@ export class Session { return this.agentManager.hasInFlightRun(agentId); } - /** - * Start streaming an agent run and forward results via the websocket broadcast - */ - private startAgentStream( - agentId: string, - prompt: AgentPromptInput, - runOptions?: AgentRunOptions, - ): { ok: true } | { ok: false; error: string } { - this.sessionLogger.trace( - { - agentId, - promptType: typeof prompt === "string" ? "string" : "structured", - hasRunOptions: Boolean(runOptions), - }, - "startAgentStream: requested", - ); - let iterator: AsyncGenerator; - try { - const shouldReplace = this.agentManager.hasInFlightRun(agentId); - iterator = shouldReplace - ? this.agentManager.replaceAgentRun(agentId, prompt, runOptions) - : this.agentManager.streamAgent(agentId, prompt, runOptions); - this.sessionLogger.trace( - { agentId, shouldReplace }, - "startAgentStream: agent iterator returned", - ); - } catch (error) { - this.handleAgentRunError(agentId, error, "Failed to start agent run"); - return { ok: false, error: errorToFriendlyMessage(error) }; - } - - void (async () => { - try { - for await (const _ of iterator) { - // Events are forwarded via the session's AgentManager subscription. - } - this.sessionLogger.trace({ agentId }, "startAgentStream: iterator drained"); - } catch (error) { - this.sessionLogger.trace({ agentId, err: error }, "startAgentStream: iterator threw"); - this.handleAgentRunError(agentId, error, "Agent stream failed"); - } - })(); - - return { ok: true }; - } - private handleAgentRunError(agentId: string, error: unknown, context: string): void { const message = errorToFriendlyMessage(error); this.sessionLogger.error({ err: error, agentId, context }, `${context} for agent ${agentId}`); @@ -4313,22 +4267,14 @@ export class Session { requestId: string, response: AgentPermissionResponse, ): Promise { - this.sessionLogger.debug( - { agentId, requestId }, - `Handling permission response for agent ${agentId}, request ${requestId}`, - ); - try { - const result = await this.agentManager.respondToPermission(agentId, requestId, response); - this.sessionLogger.debug({ agentId }, `Permission response forwarded to agent ${agentId}`); - - if (result?.followUpPrompt) { - this.sessionLogger.debug( - { agentId }, - "Permission response requires follow-up turn, starting agent stream", - ); - this.startAgentStream(agentId, result.followUpPrompt); - } + await respondToAgentPermission({ + agentManager: this.agentManager, + agentId, + requestId, + response, + logger: this.sessionLogger, + }); } catch (error) { this.sessionLogger.error( { err: error, agentId, requestId },