Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/components/Claude.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ Emits a finished payload with **session** status, **session id**, and the final
{
"data": {
"lastMessage": "Finished the requested task.",
"messages": [
"Let me check the documentation...",
"Finished the requested task."
],
"sessionId": "sess_01ExampleManagedSession",
"status": "idle"
},
Expand Down
107 changes: 96 additions & 11 deletions pkg/integrations/claude/runagent/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package runagent

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -183,6 +185,15 @@ func (c *Client) GetManagedSession(sessionID string) (*ManagedSession, error) {
return &out, nil
}

func (c *Client) DeleteManagedSession(sessionID string) error {
if sessionID == "" {
return fmt.Errorf("session id is required")
}
URL := c.BaseURL + "/sessions/" + url.PathEscape(sessionID)
_, err := c.execRequestWithBeta(http.MethodDelete, URL, nil, anthropicBetaManagedAgents)
return err
}

func (c *Client) listManagedSessionEventsPage(sessionID, page string) ([]ManagedSessionEvent, string, error) {
if sessionID == "" {
return nil, "", fmt.Errorf("session id is required")
Expand All @@ -208,6 +219,91 @@ func (c *Client) listManagedSessionEventsPage(sessionID, page string) ([]Managed
return out.Data, out.NextPage, nil
}

// StreamSessionUntilIdle opens an SSE stream on the session and processes
// events in real-time. It captures all agent.message texts and returns
// the terminal status, last message, and all messages when the session completes.
func (c *Client) StreamSessionUntilIdle(ctx context.Context, sessionID string) (status string, lastMessage string, messages []string, err error) {
if sessionID == "" {
return "", "", nil, fmt.Errorf("session id is required")
}

streamURL := c.BaseURL + "/sessions/" + url.PathEscape(sessionID) + "/events/stream"
req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil)
if err != nil {
return "", "", nil, fmt.Errorf("build stream request: %w", err)
}
req.Header.Set("x-api-key", c.APIKey)
req.Header.Set("anthropic-version", anthropicVersionValue)
req.Header.Set("anthropic-beta", anthropicBetaManagedAgents)
req.Header.Set("Accept", "text/event-stream")

resp, err := c.http.Do(req)
if err != nil {
return "", "", nil, fmt.Errorf("open stream: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
return "", "", nil, fmt.Errorf("stream request failed (%d): %s", resp.StatusCode, string(body))
}

scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)

for scanner.Scan() {
if ctx.Err() != nil {
return "", lastMessage, messages, ctx.Err()
}

line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
continue
}
payload := strings.TrimPrefix(line, "data: ")
if payload == "" {
continue
}

var event ManagedSessionEvent
if err := json.Unmarshal([]byte(payload), &event); err != nil {
continue
}

// Capture all agent messages
if event.Type == "agent.message" || event.Type == "assistant.message" {
text := extractTextFromContent(event.Content)
if text != "" {
messages = append(messages, text)
lastMessage = text
}
}

// Terminal events
if event.Type == "session.status_idle" {
return "idle", lastMessage, messages, nil
}
if event.Type == "session.status_terminated" {
return "terminated", lastMessage, messages, nil
}
}

if err := scanner.Err(); err != nil {
return "", lastMessage, messages, fmt.Errorf("stream read: %w", err)
}
return "", lastMessage, messages, fmt.Errorf("stream ended unexpectedly")
}

func extractTextFromContent(content []ManagedSessionContentBlock) string {
parts := make([]string, 0)
for _, block := range content {
if block.Type == "text" && strings.TrimSpace(block.Text) != "" {
parts = append(parts, block.Text)
}
}
return strings.Join(parts, "\n")
}

func (c *Client) GetLastManagedSessionAgentMessage(sessionID string) (string, []ManagedSessionEvent, error) {
seen := []ManagedSessionEvent{}
page := ""
Expand Down Expand Up @@ -325,17 +421,6 @@ func (c *Client) SendManagedSessionInterrupt(sessionID string) error {
return err
}

// DeleteManagedSession removes a session (DELETE /v1/sessions/{id}).
// The API does not allow deleting a running session without interrupting first.
func (c *Client) DeleteManagedSession(sessionID string) error {
if sessionID == "" {
return fmt.Errorf("session id is required")
}
URL := c.BaseURL + "/sessions/" + url.PathEscape(sessionID)
_, err := c.execRequestWithBeta(http.MethodDelete, URL, nil, anthropicBetaManagedAgents)
return err
}

func (c *Client) execRequestWithBeta(method, URL string, body io.Reader, beta string) ([]byte, error) {
req, err := http.NewRequest(method, URL, body)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/integrations/claude/runagent/example_output.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
"data": {
"status": "idle",
"sessionId": "sess_01ExampleManagedSession",
"lastMessage": "Finished the requested task."
"lastMessage": "Finished the requested task.",
"messages": [
"Let me check the documentation...",
"Finished the requested task."
]
},
"timestamp": "2026-04-26T12:00:00Z",
"type": "claude.runAgent.finished"
Expand Down
88 changes: 79 additions & 9 deletions pkg/integrations/claude/runagent/monitor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package runagent

import (
"context"
"fmt"
"net/http"
"time"
Expand All @@ -15,17 +16,82 @@ func (a *RunAgent) HandleWebhook(ctx core.WebhookRequestContext) (int, *core.Web
}

func (a *RunAgent) Hooks() []core.Hook {
return []core.Hook{{
Name: "poll",
Type: core.HookTypeInternal,
}}
return []core.Hook{
{Name: "poll", Type: core.HookTypeInternal},
{Name: "stream", Type: core.HookTypeInternal},
}
}

func (a *RunAgent) HandleHook(ctx core.ActionHookContext) error {
if ctx.Name == "poll" {
switch ctx.Name {
case "stream":
return a.stream(ctx)
case "poll":
return a.poll(ctx)
default:
return fmt.Errorf("unknown hook: %s", ctx.Name)
}
}

func (a *RunAgent) stream(ctx core.ActionHookContext) error {
// Guard against duplicate/retried stream actions
if ctx.ExecutionState.IsFinished() {
return nil
}

metadata := ExecutionMetadata{}
if err := mapstructure.Decode(ctx.Metadata.Get(), &metadata); err != nil {
return fmt.Errorf("failed to decode metadata: %w", err)
}
if metadata.Session.ID == "" {
return fmt.Errorf("missing session id in metadata")
Comment thread
cursor[bot] marked this conversation as resolved.
}
Comment thread
cursor[bot] marked this conversation as resolved.

client, err := NewClient(ctx.HTTP, ctx.Integration)
if err != nil {
ctx.Logger.Warnf("Stream hook: failed to create client: %v. Falling back to poll.", err)
return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll)
}
return fmt.Errorf("unknown hook: %s", ctx.Name)

streamCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()

status, lastMessage, messages, streamErr := client.StreamSessionUntilIdle(streamCtx, metadata.Session.ID)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream holds DB transaction

High Severity

The new stream hook blocks on StreamSessionUntilIdle for up to thirty minutes while the node request worker still holds an open Postgres transaction. Default idle_in_transaction_session_timeout is thirty seconds, so long-lived agent runs are likely to abort mid-stream instead of completing via SSE.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 0872c09. Configure here.

if streamErr != nil {
ctx.Logger.Warnf("Stream failed for session %s: %v. Falling back to poll.", metadata.Session.ID, streamErr)
return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll)
}

// If the stream completed but didn't capture any agent messages,
// fall back to the events list API as a backfill.
if lastMessage == "" {
backfill, _, backfillErr := client.GetLastManagedSessionAgentMessageWithRetry(metadata.Session.ID, finalMessageReads, finalMessageDelay)
if backfillErr != nil {
ctx.Logger.Warnf("Backfill fetch failed for session %s: %v", metadata.Session.ID, backfillErr)
}
if backfill != "" {
lastMessage = backfill
messages = append(messages, backfill)
}
}

// Emit output BEFORE updating metadata or deleting session.
// If emit fails, the poll fallback can still access the session.
out := buildOutput(status, metadata.Session.ID, lastMessage, messages)
if err := ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}); err != nil {
ctx.Logger.Warnf("Emit failed for session %s: %v. Falling back to poll.", metadata.Session.ID, err)
return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll)
}

metadata.Session.Status = status
_ = ctx.Metadata.Set(metadata)

// Clean up session after successful emit
if err := client.DeleteManagedSession(metadata.Session.ID); err != nil {
ctx.Logger.Warnf("Failed to delete managed session %s: %v", metadata.Session.ID, err)
}

return nil
}

func (a *RunAgent) poll(ctx core.ActionHookContext) error {
Expand Down Expand Up @@ -56,7 +122,7 @@ func (a *RunAgent) poll(ctx core.ActionHookContext) error {

if attempt > maxPollAttempts {
ctx.Logger.Errorf("Managed session %s exceeded max poll attempts", metadata.Session.ID)
out := buildOutput("timeout", metadata.Session.ID)
out := buildOutput("timeout", metadata.Session.ID, "", nil)
return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out})
}

Expand All @@ -70,7 +136,7 @@ func (a *RunAgent) poll(ctx core.ActionHookContext) error {
errs++
if errs >= maxPollErrors {
ctx.Logger.Errorf("Managed session %s: polling failed repeatedly: %v", metadata.Session.ID, err)
out := buildOutput("error", metadata.Session.ID)
out := buildOutput("error", metadata.Session.ID, "", nil)
return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out})
}
return a.scheduleNextPoll(ctx, attempt+1, errs)
Expand All @@ -90,7 +156,11 @@ func (a *RunAgent) poll(ctx core.ActionHookContext) error {
if err == nil && lastMessage == "" {
ctx.Logger.Warnf("No final agent message found for managed session %s. Event types: %s", metadata.Session.ID, managedSessionEventTypes(events))
}
out := buildOutput(sess.Status, metadata.Session.ID, lastMessage)
var msgs []string
if lastMessage != "" {
msgs = []string{lastMessage}
}
out := buildOutput(sess.Status, metadata.Session.ID, lastMessage, msgs)
return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out})
}

Expand Down
26 changes: 4 additions & 22 deletions pkg/integrations/claude/runagent/run_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,28 +153,10 @@ func (a *RunAgent) Execute(ctx core.ExecutionContext) error {
return fmt.Errorf("failed to send user message: %w", err)
}

// Refresh status after work may have already progressed.
refreshed, err := client.GetManagedSession(session.ID)
if err != nil {
return fmt.Errorf("failed to get session: %w", err)
}
mergeSessionIntoMetadata(&metadata, refreshed)
_ = ctx.Metadata.Set(metadata)

if refreshed != nil && isSessionTerminal(refreshed.Status) {
lastMessage, events, err := client.GetLastManagedSessionAgentMessageWithRetry(session.ID, finalMessageReads, finalMessageDelay)
if err != nil {
ctx.Logger.Warnf("Failed to fetch final message for managed session %s: %v", session.ID, err)
}
if err == nil && lastMessage == "" {
ctx.Logger.Warnf("No final agent message found for managed session %s. Event types: %s", session.ID, managedSessionEventTypes(events))
}
out := buildOutput(refreshed.Status, session.ID, lastMessage)
return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out})
}

ctx.Logger.Infof("Started Managed Agent session %s. Waiting for completion (polling)...", session.ID)
return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll)
// Don't block Execute() — it runs inside a DB transaction.
// Schedule streaming as an action call that runs outside the transaction.
ctx.Logger.Infof("Started Managed Agent session %s. Scheduling stream...", session.ID)
return ctx.Requests.ScheduleActionCall("stream", map[string]any{"attempt": 1, "errors": 0}, 0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero stream schedule interval

High Severity

Execute schedules the stream hook with a 0 delay, but production ExecutionRequestContext.ScheduleActionCall rejects any interval under one second. The action errors after the session is created and the user message is sent, so the stream never runs and the run fails.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 0872c09. Configure here.

}

func (a *RunAgent) Cleanup(ctx core.SetupContext) error { return nil }
Expand Down
30 changes: 13 additions & 17 deletions pkg/integrations/claude/runagent/run_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,18 @@ func Test__RunAgent__Setup__validation(t *testing.T) {

func Test__RunAgent__Execute__syncIdle(t *testing.T) {
a := &RunAgent{}

// SSE stream that sends an agent message then goes idle
sseStream := "data: {\"type\":\"session.status_running\"}\n\n" +
"data: {\"type\":\"agent.message\",\"content\":[{\"type\":\"text\",\"text\":\"Done\"}]}\n\n" +
"data: {\"type\":\"session.status_idle\"}\n\n"

httpContext := &contexts.HTTPContext{
Responses: []*http.Response{
{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"running"}`))},
{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{}`))},
{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"idle"}`))},
{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"data":[{"type":"user.message","content":[{"type":"text","text":"Hello"}]},{"type":"agent.message","content":[{"type":"text","text":"Done"}]}],"next_page":null}`))},
{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(sseStream))},
{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{}`))},
},
}
integrationCtx := &contexts.IntegrationContext{
Expand All @@ -73,31 +79,22 @@ func Test__RunAgent__Execute__syncIdle(t *testing.T) {

err := a.Execute(execCtx)
require.NoError(t, err)
require.True(t, executionState.Finished)
assert.Equal(t, payloadType, executionState.Type)
assert.Equal(t, "idle", executionState.Payloads[0].(map[string]any)["data"].(OutputPayload).Status)
assert.Equal(t, "Done", executionState.Payloads[0].(map[string]any)["data"].(OutputPayload).LastMessage)
assert.Equal(t, "", requestsCtx.Action)
assert.Equal(t, "stream", requestsCtx.Action)
assert.False(t, executionState.Finished)

require.Len(t, httpContext.Requests, 4)
require.Len(t, httpContext.Requests, 2)
assert.Equal(t, "POST", httpContext.Requests[0].Method)
assert.Contains(t, httpContext.Requests[0].URL.Path, "/sessions")
assert.Equal(t, anthropicBetaManagedAgents, httpContext.Requests[0].Header.Get("anthropic-beta"))
assert.Contains(t, httpContext.Requests[1].URL.Path, "/events")
assert.Equal(t, "GET", httpContext.Requests[2].Method)
assert.Equal(t, "GET", httpContext.Requests[3].Method)
assert.Contains(t, httpContext.Requests[3].URL.Path, "/events")
assert.Equal(t, "desc", httpContext.Requests[3].URL.Query().Get("order"))
assert.Equal(t, sessionEventsPageLimit, httpContext.Requests[3].URL.Query().Get("limit"))
}

func Test__RunAgent__Execute__schedulesPoll(t *testing.T) {
func Test__RunAgent__Execute__schedulesStream(t *testing.T) {
a := &RunAgent{}
httpContext := &contexts.HTTPContext{
Responses: []*http.Response{
{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"running"}`))},
{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{}`))},
{StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"running"}`))},
},
}
integrationCtx := &contexts.IntegrationContext{Configuration: map[string]any{"apiKey": "k"}}
Expand All @@ -119,8 +116,7 @@ func Test__RunAgent__Execute__schedulesPoll(t *testing.T) {
err := a.Execute(execCtx)
require.NoError(t, err)
assert.False(t, executionState.Finished)
assert.Equal(t, "poll", requestsCtx.Action)
assert.Equal(t, initialPoll, requestsCtx.Duration)
assert.Equal(t, "stream", requestsCtx.Action)
}

func Test__RunAgent__poll__terminal(t *testing.T) {
Expand Down
Loading