Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/server/src/server/agent/agent-prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import type { AgentManager } from "./agent-manager.js";
import type { AgentStorage } from "./agent-storage.js";
import { ensureAgentLoaded } from "./agent-loading.js";

export type AgentRunController = Pick<
AgentManager,
"tryRunOutOfBand" | "hasInFlightRun" | "replaceAgentRun" | "streamAgent"
>;

export interface StartAgentRunOptions {
replaceRunning?: boolean;
runOptions?: AgentRunOptions;
}

export function startAgentRun(
agentManager: AgentManager,
agentManager: AgentRunController,
agentId: string,
prompt: AgentPromptInput,
logger: Logger,
Expand Down
9 changes: 8 additions & 1 deletion packages/server/src/server/agent/mcp-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import {
waitForAgentWithTimeout,
} from "./mcp-shared.js";
import { sendPromptToAgent, setupFinishNotification, startAgentRun } from "./agent-prompt.js";
import { respondToAgentPermission } from "./permission-response.js";
import type { GitHubService } from "../../services/github-service.js";
import type { WorkspaceGitService } from "../workspace-git-service.js";
import type { CreatePaseoWorktreeInput } from "../paseo-worktree-service.js";
Expand Down Expand Up @@ -2086,7 +2087,13 @@ export async function createAgentMcpServer(options: AgentMcpServerOptions): Prom
},
},
async ({ agentId, requestId, response }) => {
await agentManager.respondToPermission(agentId, requestId, response);
await respondToAgentPermission({
agentManager,
agentId,
requestId,
response,
logger: childLogger,
});
return {
content: [],
structuredContent: ensureValidJson({ success: true }),
Expand Down
138 changes: 138 additions & 0 deletions packages/server/src/server/agent/permission-response.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { describe, expect, test } from "vitest";

import { createTestLogger } from "../../test-utils/test-logger.js";
import type {
AgentPromptInput,
AgentPermissionResult,
AgentRunOptions,
AgentPermissionResponse,
} from "./agent-sdk-types.js";
import type { AgentStreamEvent } from "../messages.js";
import { respondToAgentPermission } from "./permission-response.js";

class FakePermissionAgentManager {
permissionResult: AgentPermissionResult | void;
hasRunInFlight = false;
outOfBandHandled = false;
permissionResponses: Array<{
agentId: string;
requestId: string;
response: AgentPermissionResponse;
}> = [];
streamRuns: Array<{ agentId: string; prompt: AgentPromptInput; options?: AgentRunOptions }> = [];
replacementRuns: Array<{ agentId: string; prompt: AgentPromptInput; options?: AgentRunOptions }> =
[];

async respondToPermission(
agentId: string,
requestId: string,
response: AgentPermissionResponse,
): Promise<AgentPermissionResult | void> {
this.permissionResponses.push({ agentId, requestId, response });
return this.permissionResult;
}

tryRunOutOfBand(): boolean {
return this.outOfBandHandled;
}

hasInFlightRun(): boolean {
return this.hasRunInFlight;
}

streamAgent(
agentId: string,
prompt: AgentPromptInput,
options?: AgentRunOptions,
): AsyncGenerator<AgentStreamEvent> {
this.streamRuns.push({ agentId, prompt, options });
return emptyAgentStream();
}

replaceAgentRun(
agentId: string,
prompt: AgentPromptInput,
options?: AgentRunOptions,
): AsyncGenerator<AgentStreamEvent> {
this.replacementRuns.push({ agentId, prompt, options });
return emptyAgentStream();
}
}

async function* emptyAgentStream(): AsyncGenerator<AgentStreamEvent> {}

describe("respondToAgentPermission", () => {
const logger = createTestLogger();

test("starts a follow-up run returned by the provider permission response", async () => {
const agentManager = new FakePermissionAgentManager();
agentManager.permissionResult = { followUpPrompt: "implement the approved plan" };

await respondToAgentPermission({
agentManager,
agentId: "agent-1",
requestId: "permission-1",
response: { behavior: "allow" },
logger,
});

expect(agentManager.permissionResponses).toEqual([
{
agentId: "agent-1",
requestId: "permission-1",
response: { behavior: "allow" },
},
]);
expect(agentManager.streamRuns).toEqual([
{
agentId: "agent-1",
prompt: "implement the approved plan",
},
]);
expect(agentManager.replacementRuns).toEqual([]);
});

test("does not start a run when the permission response has no follow-up prompt", async () => {
const agentManager = new FakePermissionAgentManager();

await respondToAgentPermission({
agentManager,
agentId: "agent-1",
requestId: "permission-1",
response: { behavior: "deny", message: "not now" },
logger,
});

expect(agentManager.permissionResponses).toEqual([
{
agentId: "agent-1",
requestId: "permission-1",
response: { behavior: "deny", message: "not now" },
},
]);
expect(agentManager.streamRuns).toEqual([]);
expect(agentManager.replacementRuns).toEqual([]);
});

test("replaces an in-flight run for follow-up prompts", async () => {
const agentManager = new FakePermissionAgentManager();
agentManager.hasRunInFlight = true;
agentManager.permissionResult = { followUpPrompt: "continue after approval" };

await respondToAgentPermission({
agentManager,
agentId: "agent-1",
requestId: "permission-1",
response: { behavior: "allow" },
logger,
});

expect(agentManager.streamRuns).toEqual([]);
expect(agentManager.replacementRuns).toEqual([
{
agentId: "agent-1",
prompt: "continue after approval",
},
]);
});
});
40 changes: 40 additions & 0 deletions packages/server/src/server/agent/permission-response.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import type { Logger } from "pino";

import type { AgentPermissionResponse, AgentPermissionResult } from "./agent-sdk-types.js";
import { startAgentRun, type AgentRunController } from "./agent-prompt.js";

export interface PermissionResponseAgentManager extends AgentRunController {
respondToPermission(
agentId: string,
requestId: string,
response: AgentPermissionResponse,
): Promise<AgentPermissionResult | void>;
}

export interface RespondToAgentPermissionParams {
agentManager: PermissionResponseAgentManager;
agentId: string;
requestId: string;
response: AgentPermissionResponse;
logger: Logger;
}

export async function respondToAgentPermission(
params: RespondToAgentPermissionParams,
): Promise<void> {
const { agentManager, agentId, requestId, response, logger } = params;
logger.debug(
{ agentId, requestId },
`Handling permission response for agent ${agentId}, request ${requestId}`,
);

const result = await agentManager.respondToPermission(agentId, requestId, response);
logger.debug({ agentId }, `Permission response forwarded to agent ${agentId}`);

if (result?.followUpPrompt) {
logger.debug({ agentId }, "Permission response requires follow-up turn, starting agent stream");
startAgentRun(agentManager, agentId, result.followUpPrompt, logger, {
replaceRunning: true,
});
}
}
70 changes: 8 additions & 62 deletions packages/server/src/server/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import {
sendPromptToAgent,
unarchiveAgentState,
} from "./agent/agent-prompt.js";
import { respondToAgentPermission } from "./agent/permission-response.js";
import { experimental_createMCPClient } from "ai";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
import type { VoiceCallerContext, VoiceSpeakHandler } from "./voice-types.js";
Expand Down Expand Up @@ -127,7 +128,6 @@ import type {
AgentPromptInput,
AgentRunOptions,
AgentSessionConfig,
AgentStreamEvent,
ProviderSnapshotEntry,
} from "./agent/agent-sdk-types.js";
import type { StoredAgentRecord } from "./agent/agent-storage.js";
Expand Down Expand Up @@ -1056,52 +1056,6 @@ export class Session {
return this.agentManager.hasInFlightRun(agentId);
}

/**
* Start streaming an agent run and forward results via the websocket broadcast
*/
private startAgentStream(
agentId: string,
prompt: AgentPromptInput,
runOptions?: AgentRunOptions,
): { ok: true } | { ok: false; error: string } {
this.sessionLogger.trace(
{
agentId,
promptType: typeof prompt === "string" ? "string" : "structured",
hasRunOptions: Boolean(runOptions),
},
"startAgentStream: requested",
);
let iterator: AsyncGenerator<AgentStreamEvent>;
try {
const shouldReplace = this.agentManager.hasInFlightRun(agentId);
iterator = shouldReplace
? this.agentManager.replaceAgentRun(agentId, prompt, runOptions)
: this.agentManager.streamAgent(agentId, prompt, runOptions);
this.sessionLogger.trace(
{ agentId, shouldReplace },
"startAgentStream: agent iterator returned",
);
} catch (error) {
this.handleAgentRunError(agentId, error, "Failed to start agent run");
return { ok: false, error: errorToFriendlyMessage(error) };
}

void (async () => {
try {
for await (const _ of iterator) {
// Events are forwarded via the session's AgentManager subscription.
}
this.sessionLogger.trace({ agentId }, "startAgentStream: iterator drained");
} catch (error) {
this.sessionLogger.trace({ agentId, err: error }, "startAgentStream: iterator threw");
this.handleAgentRunError(agentId, error, "Agent stream failed");
}
})();

return { ok: true };
}

private handleAgentRunError(agentId: string, error: unknown, context: string): void {
const message = errorToFriendlyMessage(error);
this.sessionLogger.error({ err: error, agentId, context }, `${context} for agent ${agentId}`);
Expand Down Expand Up @@ -4313,22 +4267,14 @@ export class Session {
requestId: string,
response: AgentPermissionResponse,
): Promise<void> {
this.sessionLogger.debug(
{ agentId, requestId },
`Handling permission response for agent ${agentId}, request ${requestId}`,
);

try {
const result = await this.agentManager.respondToPermission(agentId, requestId, response);
this.sessionLogger.debug({ agentId }, `Permission response forwarded to agent ${agentId}`);

if (result?.followUpPrompt) {
this.sessionLogger.debug(
{ agentId },
"Permission response requires follow-up turn, starting agent stream",
);
this.startAgentStream(agentId, result.followUpPrompt);
}
await respondToAgentPermission({
agentManager: this.agentManager,
agentId,
requestId,
response,
logger: this.sessionLogger,
});
} catch (error) {
this.sessionLogger.error(
{ err: error, agentId, requestId },
Expand Down
Loading