From 66184ae9f5e0391e30c09c62821ac407eb202dce Mon Sep 17 00:00:00 2001 From: Bender Rodriguez Date: Wed, 10 Jun 2026 12:11:58 +0000 Subject: [PATCH 01/10] feat: switch runAgent from polling to SSE streaming (#5278) Replace the poll-based completion detection with real-time SSE streaming via GET /sessions/{id}/events/stream. How it works: - After creating session and sending the user message, Execute() opens an SSE stream and processes events in real-time - agent.message events are captured as they arrive, always keeping the latest one - When session.status_idle or session.status_terminated arrives, emit the output with the captured last message - If streaming fails, fall back to the poll-based monitor This eliminates the eventual consistency issue where the events list API doesn't have agent.message events immediately after the session goes idle. In the stream, events arrive in chronological order, so we always capture all messages before the terminal event. Fixes #5278 --- pkg/integrations/claude/runagent/client.go | 87 +++++++++++++++++++ pkg/integrations/claude/runagent/run_agent.go | 32 +++---- .../claude/runagent/run_agent_test.go | 16 ++-- 3 files changed, 108 insertions(+), 27 deletions(-) diff --git a/pkg/integrations/claude/runagent/client.go b/pkg/integrations/claude/runagent/client.go index a7cf6cca18..a1e3fbb38e 100644 --- a/pkg/integrations/claude/runagent/client.go +++ b/pkg/integrations/claude/runagent/client.go @@ -1,7 +1,9 @@ package runagent import ( + "bufio" "bytes" + "context" "encoding/json" "fmt" "io" @@ -208,6 +210,91 @@ func (c *Client) listManagedSessionEventsPage(sessionID, page string) ([]Managed return out.Data, out.NextPage, nil } +// StreamSessionUntilIdle opens an SSE stream on the session and processes +// events in real-time. It captures the last agent.message text and returns +// it along with the terminal status when the session reaches idle or terminated. +func (c *Client) StreamSessionUntilIdle(ctx context.Context, sessionID string) (status string, lastMessage string, err error) { + if sessionID == "" { + return "", "", fmt.Errorf("session id is required") + } + + streamURL := c.BaseURL + "/sessions/" + url.PathEscape(sessionID) + "/events/stream" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil) + if err != nil { + return "", "", fmt.Errorf("build stream request: %w", err) + } + req.Header.Set("x-api-key", c.APIKey) + req.Header.Set("anthropic-version", anthropicVersionValue) + req.Header.Set("anthropic-beta", anthropicBetaManagedAgents) + req.Header.Set("Accept", "text/event-stream") + + resp, err := c.http.Do(req) + if err != nil { + return "", "", fmt.Errorf("open stream: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + body, _ := io.ReadAll(resp.Body) + return "", "", fmt.Errorf("stream request failed (%d): %s", resp.StatusCode, string(body)) + } + + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + + for scanner.Scan() { + if ctx.Err() != nil { + return "", lastMessage, ctx.Err() + } + + line := scanner.Text() + if !strings.HasPrefix(line, "data: ") { + continue + } + payload := strings.TrimPrefix(line, "data: ") + if payload == "" { + continue + } + + var event ManagedSessionEvent + if err := json.Unmarshal([]byte(payload), &event); err != nil { + continue + } + + // Capture agent messages — always keep the latest one + if event.Type == "agent.message" || event.Type == "assistant.message" { + text := extractTextFromContent(event.Content) + if text != "" { + lastMessage = text + } + } + + // Terminal events + if event.Type == "session.status_idle" { + return "idle", lastMessage, nil + } + if event.Type == "session.status_terminated" { + return "terminated", lastMessage, nil + } + } + + if err := scanner.Err(); err != nil { + return "", lastMessage, fmt.Errorf("stream read: %w", err) + } + // Stream ended without terminal event + return "", lastMessage, fmt.Errorf("stream ended unexpectedly") +} + +func extractTextFromContent(content []ManagedSessionContentBlock) string { + parts := make([]string, 0) + for _, block := range content { + if block.Type == "text" && strings.TrimSpace(block.Text) != "" { + parts = append(parts, block.Text) + } + } + return strings.Join(parts, "\n") +} + func (c *Client) GetLastManagedSessionAgentMessage(sessionID string) (string, []ManagedSessionEvent, error) { seen := []ManagedSessionEvent{} page := "" diff --git a/pkg/integrations/claude/runagent/run_agent.go b/pkg/integrations/claude/runagent/run_agent.go index a0da50b805..bcb4522916 100644 --- a/pkg/integrations/claude/runagent/run_agent.go +++ b/pkg/integrations/claude/runagent/run_agent.go @@ -1,6 +1,7 @@ package runagent import ( + "context" "fmt" "strings" @@ -153,28 +154,19 @@ func (a *RunAgent) Execute(ctx core.ExecutionContext) error { return fmt.Errorf("failed to send user message: %w", err) } - // Refresh status after work may have already progressed. - refreshed, err := client.GetManagedSession(session.ID) - if err != nil { - return fmt.Errorf("failed to get session: %w", err) - } - mergeSessionIntoMetadata(&metadata, refreshed) - _ = ctx.Metadata.Set(metadata) - - if refreshed != nil && isSessionTerminal(refreshed.Status) { - lastMessage, events, err := client.GetLastManagedSessionAgentMessageWithRetry(session.ID, finalMessageReads, finalMessageDelay) - if err != nil { - ctx.Logger.Warnf("Failed to fetch final message for managed session %s: %v", session.ID, err) - } - if err == nil && lastMessage == "" { - ctx.Logger.Warnf("No final agent message found for managed session %s. Event types: %s", session.ID, managedSessionEventTypes(events)) - } - out := buildOutput(refreshed.Status, session.ID, lastMessage) - return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) + // Stream session events until completion. + // This captures agent messages in real-time, avoiding the eventual + // consistency issue with the events list API. + ctx.Logger.Infof("Started Managed Agent session %s. Streaming events...", session.ID) + streamCtx := context.Background() + status, lastMessage, streamErr := client.StreamSessionUntilIdle(streamCtx, session.ID) + if streamErr != nil { + ctx.Logger.Warnf("Stream failed for session %s: %v. Falling back to poll.", session.ID, streamErr) + return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) } - ctx.Logger.Infof("Started Managed Agent session %s. Waiting for completion (polling)...", session.ID) - return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) + out := buildOutput(status, session.ID, lastMessage) + return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) } func (a *RunAgent) Cleanup(ctx core.SetupContext) error { return nil } diff --git a/pkg/integrations/claude/runagent/run_agent_test.go b/pkg/integrations/claude/runagent/run_agent_test.go index 780aeb358a..3dd744a789 100644 --- a/pkg/integrations/claude/runagent/run_agent_test.go +++ b/pkg/integrations/claude/runagent/run_agent_test.go @@ -45,12 +45,17 @@ func Test__RunAgent__Setup__validation(t *testing.T) { func Test__RunAgent__Execute__syncIdle(t *testing.T) { a := &RunAgent{} + + // SSE stream that sends an agent message then goes idle + sseStream := "data: {\"type\":\"session.status_running\"}\n\n" + + "data: {\"type\":\"agent.message\",\"content\":[{\"type\":\"text\",\"text\":\"Done\"}]}\n\n" + + "data: {\"type\":\"session.status_idle\"}\n\n" + httpContext := &contexts.HTTPContext{ Responses: []*http.Response{ {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"running"}`))}, {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{}`))}, - {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"idle"}`))}, - {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"data":[{"type":"user.message","content":[{"type":"text","text":"Hello"}]},{"type":"agent.message","content":[{"type":"text","text":"Done"}]}],"next_page":null}`))}, + {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(sseStream))}, }, } integrationCtx := &contexts.IntegrationContext{ @@ -79,16 +84,13 @@ func Test__RunAgent__Execute__syncIdle(t *testing.T) { assert.Equal(t, "Done", executionState.Payloads[0].(map[string]any)["data"].(OutputPayload).LastMessage) assert.Equal(t, "", requestsCtx.Action) - require.Len(t, httpContext.Requests, 4) + require.Len(t, httpContext.Requests, 3) assert.Equal(t, "POST", httpContext.Requests[0].Method) assert.Contains(t, httpContext.Requests[0].URL.Path, "/sessions") assert.Equal(t, anthropicBetaManagedAgents, httpContext.Requests[0].Header.Get("anthropic-beta")) assert.Contains(t, httpContext.Requests[1].URL.Path, "/events") assert.Equal(t, "GET", httpContext.Requests[2].Method) - assert.Equal(t, "GET", httpContext.Requests[3].Method) - assert.Contains(t, httpContext.Requests[3].URL.Path, "/events") - assert.Equal(t, "desc", httpContext.Requests[3].URL.Query().Get("order")) - assert.Equal(t, sessionEventsPageLimit, httpContext.Requests[3].URL.Query().Get("limit")) + assert.Contains(t, httpContext.Requests[2].URL.Path, "/events/stream") } func Test__RunAgent__Execute__schedulesPoll(t *testing.T) { From 29d07748ab4cf6213737bac93d7f55bf9c5c817a Mon Sep 17 00:00:00 2001 From: Bender Rodriguez Date: Wed, 10 Jun 2026 12:44:02 +0000 Subject: [PATCH 02/10] feat: include all messages in output, delete session after completion - Output payload now includes 'messages' array with all agent messages from the session (intermediate + final) - 'lastMessage' still has just the final message for convenience - Session is deleted via DELETE /sessions/{id} after completion to clean up resources on Anthropic's side - Updated tests and example output --- pkg/integrations/claude/runagent/client.go | 48 +++++++++---------- .../claude/runagent/example_output.json | 6 ++- pkg/integrations/claude/runagent/monitor.go | 6 +-- pkg/integrations/claude/runagent/run_agent.go | 9 +++- .../claude/runagent/run_agent_test.go | 5 +- pkg/integrations/claude/runagent/types.go | 21 ++++---- 6 files changed, 52 insertions(+), 43 deletions(-) diff --git a/pkg/integrations/claude/runagent/client.go b/pkg/integrations/claude/runagent/client.go index a1e3fbb38e..7a5f09feea 100644 --- a/pkg/integrations/claude/runagent/client.go +++ b/pkg/integrations/claude/runagent/client.go @@ -185,6 +185,15 @@ func (c *Client) GetManagedSession(sessionID string) (*ManagedSession, error) { return &out, nil } +func (c *Client) DeleteManagedSession(sessionID string) error { + if sessionID == "" { + return fmt.Errorf("session id is required") + } + URL := c.BaseURL + "/sessions/" + url.PathEscape(sessionID) + _, err := c.execRequestWithBeta(http.MethodDelete, URL, nil, anthropicBetaManagedAgents) + return err +} + func (c *Client) listManagedSessionEventsPage(sessionID, page string) ([]ManagedSessionEvent, string, error) { if sessionID == "" { return nil, "", fmt.Errorf("session id is required") @@ -211,17 +220,17 @@ func (c *Client) listManagedSessionEventsPage(sessionID, page string) ([]Managed } // StreamSessionUntilIdle opens an SSE stream on the session and processes -// events in real-time. It captures the last agent.message text and returns -// it along with the terminal status when the session reaches idle or terminated. -func (c *Client) StreamSessionUntilIdle(ctx context.Context, sessionID string) (status string, lastMessage string, err error) { +// events in real-time. It captures all agent.message texts and returns +// the terminal status, last message, and all messages when the session completes. +func (c *Client) StreamSessionUntilIdle(ctx context.Context, sessionID string) (status string, lastMessage string, messages []string, err error) { if sessionID == "" { - return "", "", fmt.Errorf("session id is required") + return "", "", nil, fmt.Errorf("session id is required") } streamURL := c.BaseURL + "/sessions/" + url.PathEscape(sessionID) + "/events/stream" req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil) if err != nil { - return "", "", fmt.Errorf("build stream request: %w", err) + return "", "", nil, fmt.Errorf("build stream request: %w", err) } req.Header.Set("x-api-key", c.APIKey) req.Header.Set("anthropic-version", anthropicVersionValue) @@ -230,13 +239,13 @@ func (c *Client) StreamSessionUntilIdle(ctx context.Context, sessionID string) ( resp, err := c.http.Do(req) if err != nil { - return "", "", fmt.Errorf("open stream: %w", err) + return "", "", nil, fmt.Errorf("open stream: %w", err) } defer resp.Body.Close() if resp.StatusCode >= 400 { body, _ := io.ReadAll(resp.Body) - return "", "", fmt.Errorf("stream request failed (%d): %s", resp.StatusCode, string(body)) + return "", "", nil, fmt.Errorf("stream request failed (%d): %s", resp.StatusCode, string(body)) } scanner := bufio.NewScanner(resp.Body) @@ -244,7 +253,7 @@ func (c *Client) StreamSessionUntilIdle(ctx context.Context, sessionID string) ( for scanner.Scan() { if ctx.Err() != nil { - return "", lastMessage, ctx.Err() + return "", lastMessage, messages, ctx.Err() } line := scanner.Text() @@ -261,28 +270,28 @@ func (c *Client) StreamSessionUntilIdle(ctx context.Context, sessionID string) ( continue } - // Capture agent messages — always keep the latest one + // Capture all agent messages if event.Type == "agent.message" || event.Type == "assistant.message" { text := extractTextFromContent(event.Content) if text != "" { + messages = append(messages, text) lastMessage = text } } // Terminal events if event.Type == "session.status_idle" { - return "idle", lastMessage, nil + return "idle", lastMessage, messages, nil } if event.Type == "session.status_terminated" { - return "terminated", lastMessage, nil + return "terminated", lastMessage, messages, nil } } if err := scanner.Err(); err != nil { - return "", lastMessage, fmt.Errorf("stream read: %w", err) + return "", lastMessage, messages, fmt.Errorf("stream read: %w", err) } - // Stream ended without terminal event - return "", lastMessage, fmt.Errorf("stream ended unexpectedly") + return "", lastMessage, messages, fmt.Errorf("stream ended unexpectedly") } func extractTextFromContent(content []ManagedSessionContentBlock) string { @@ -412,17 +421,6 @@ func (c *Client) SendManagedSessionInterrupt(sessionID string) error { return err } -// DeleteManagedSession removes a session (DELETE /v1/sessions/{id}). -// The API does not allow deleting a running session without interrupting first. -func (c *Client) DeleteManagedSession(sessionID string) error { - if sessionID == "" { - return fmt.Errorf("session id is required") - } - URL := c.BaseURL + "/sessions/" + url.PathEscape(sessionID) - _, err := c.execRequestWithBeta(http.MethodDelete, URL, nil, anthropicBetaManagedAgents) - return err -} - func (c *Client) execRequestWithBeta(method, URL string, body io.Reader, beta string) ([]byte, error) { req, err := http.NewRequest(method, URL, body) if err != nil { diff --git a/pkg/integrations/claude/runagent/example_output.json b/pkg/integrations/claude/runagent/example_output.json index 85fc479345..ba605d5a6a 100644 --- a/pkg/integrations/claude/runagent/example_output.json +++ b/pkg/integrations/claude/runagent/example_output.json @@ -2,7 +2,11 @@ "data": { "status": "idle", "sessionId": "sess_01ExampleManagedSession", - "lastMessage": "Finished the requested task." + "lastMessage": "Finished the requested task.", + "messages": [ + "Let me check the documentation...", + "Finished the requested task." + ] }, "timestamp": "2026-04-26T12:00:00Z", "type": "claude.runAgent.finished" diff --git a/pkg/integrations/claude/runagent/monitor.go b/pkg/integrations/claude/runagent/monitor.go index a387cf6908..0b569e8298 100644 --- a/pkg/integrations/claude/runagent/monitor.go +++ b/pkg/integrations/claude/runagent/monitor.go @@ -56,7 +56,7 @@ func (a *RunAgent) poll(ctx core.ActionHookContext) error { if attempt > maxPollAttempts { ctx.Logger.Errorf("Managed session %s exceeded max poll attempts", metadata.Session.ID) - out := buildOutput("timeout", metadata.Session.ID) + out := buildOutput("timeout", metadata.Session.ID, "", nil) return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) } @@ -70,7 +70,7 @@ func (a *RunAgent) poll(ctx core.ActionHookContext) error { errs++ if errs >= maxPollErrors { ctx.Logger.Errorf("Managed session %s: polling failed repeatedly: %v", metadata.Session.ID, err) - out := buildOutput("error", metadata.Session.ID) + out := buildOutput("error", metadata.Session.ID, "", nil) return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) } return a.scheduleNextPoll(ctx, attempt+1, errs) @@ -90,7 +90,7 @@ func (a *RunAgent) poll(ctx core.ActionHookContext) error { if err == nil && lastMessage == "" { ctx.Logger.Warnf("No final agent message found for managed session %s. Event types: %s", metadata.Session.ID, managedSessionEventTypes(events)) } - out := buildOutput(sess.Status, metadata.Session.ID, lastMessage) + out := buildOutput(sess.Status, metadata.Session.ID, lastMessage, nil) return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) } diff --git a/pkg/integrations/claude/runagent/run_agent.go b/pkg/integrations/claude/runagent/run_agent.go index bcb4522916..8d268743b0 100644 --- a/pkg/integrations/claude/runagent/run_agent.go +++ b/pkg/integrations/claude/runagent/run_agent.go @@ -159,13 +159,18 @@ func (a *RunAgent) Execute(ctx core.ExecutionContext) error { // consistency issue with the events list API. ctx.Logger.Infof("Started Managed Agent session %s. Streaming events...", session.ID) streamCtx := context.Background() - status, lastMessage, streamErr := client.StreamSessionUntilIdle(streamCtx, session.ID) + status, lastMessage, messages, streamErr := client.StreamSessionUntilIdle(streamCtx, session.ID) if streamErr != nil { ctx.Logger.Warnf("Stream failed for session %s: %v. Falling back to poll.", session.ID, streamErr) return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) } - out := buildOutput(status, session.ID, lastMessage) + // Clean up the session on Anthropic's side + if err := client.DeleteManagedSession(session.ID); err != nil { + ctx.Logger.Warnf("Failed to delete managed session %s: %v", session.ID, err) + } + + out := buildOutput(status, session.ID, lastMessage, messages) return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) } diff --git a/pkg/integrations/claude/runagent/run_agent_test.go b/pkg/integrations/claude/runagent/run_agent_test.go index 3dd744a789..0ef0e81ef2 100644 --- a/pkg/integrations/claude/runagent/run_agent_test.go +++ b/pkg/integrations/claude/runagent/run_agent_test.go @@ -56,6 +56,7 @@ func Test__RunAgent__Execute__syncIdle(t *testing.T) { {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"running"}`))}, {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{}`))}, {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(sseStream))}, + {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{}`))}, }, } integrationCtx := &contexts.IntegrationContext{ @@ -84,13 +85,15 @@ func Test__RunAgent__Execute__syncIdle(t *testing.T) { assert.Equal(t, "Done", executionState.Payloads[0].(map[string]any)["data"].(OutputPayload).LastMessage) assert.Equal(t, "", requestsCtx.Action) - require.Len(t, httpContext.Requests, 3) + require.Len(t, httpContext.Requests, 4) assert.Equal(t, "POST", httpContext.Requests[0].Method) assert.Contains(t, httpContext.Requests[0].URL.Path, "/sessions") assert.Equal(t, anthropicBetaManagedAgents, httpContext.Requests[0].Header.Get("anthropic-beta")) assert.Contains(t, httpContext.Requests[1].URL.Path, "/events") assert.Equal(t, "GET", httpContext.Requests[2].Method) assert.Contains(t, httpContext.Requests[2].URL.Path, "/events/stream") + assert.Equal(t, "DELETE", httpContext.Requests[3].Method) + assert.Contains(t, httpContext.Requests[3].URL.Path, "/sessions/sess_1") } func Test__RunAgent__Execute__schedulesPoll(t *testing.T) { diff --git a/pkg/integrations/claude/runagent/types.go b/pkg/integrations/claude/runagent/types.go index ecb1796325..33aafc2801 100644 --- a/pkg/integrations/claude/runagent/types.go +++ b/pkg/integrations/claude/runagent/types.go @@ -38,24 +38,23 @@ type SessionMetadata struct { // OutputPayload is emitted on the default channel when the run completes. type OutputPayload struct { - Status string `json:"status"` - SessionID string `json:"sessionId"` - LastMessage string `json:"lastMessage"` + Status string `json:"status"` + SessionID string `json:"sessionId"` + LastMessage string `json:"lastMessage"` + Messages []string `json:"messages"` } func isSessionTerminal(status string) bool { return status == sessionStatusIdle || status == sessionStatusTerminated } -func buildOutput(status, sessionID string, lastMessage ...string) OutputPayload { - out := OutputPayload{ - Status: status, - SessionID: sessionID, +func buildOutput(status, sessionID string, lastMessage string, messages []string) OutputPayload { + return OutputPayload{ + Status: status, + SessionID: sessionID, + LastMessage: lastMessage, + Messages: messages, } - if len(lastMessage) > 0 { - out.LastMessage = lastMessage[0] - } - return out } func mergeSessionIntoMetadata(metadata *ExecutionMetadata, s *ManagedSession) { From 2df339575047b8d0d5f1e4ea29535ee66e00116e Mon Sep 17 00:00:00 2001 From: Bender Rodriguez Date: Wed, 10 Jun 2026 13:02:45 +0000 Subject: [PATCH 03/10] fix: address bugbot review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Stream uses cancellable context with 30min timeout instead of context.Background() — allows cleanup on cancellation 2. Update execution metadata with final status after stream completes — no longer shows stale 'running' state 3. HTTP timeout: streaming uses c.http.Do (same as other calls). If the 30s client timeout cuts the stream, falls back to polling. A dedicated streaming HTTP client would be a bigger infrastructure change tracked separately. --- pkg/integrations/claude/runagent/run_agent.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/integrations/claude/runagent/run_agent.go b/pkg/integrations/claude/runagent/run_agent.go index 8d268743b0..d8ab0f4289 100644 --- a/pkg/integrations/claude/runagent/run_agent.go +++ b/pkg/integrations/claude/runagent/run_agent.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/google/uuid" "github.com/mitchellh/mapstructure" @@ -158,13 +159,18 @@ func (a *RunAgent) Execute(ctx core.ExecutionContext) error { // This captures agent messages in real-time, avoiding the eventual // consistency issue with the events list API. ctx.Logger.Infof("Started Managed Agent session %s. Streaming events...", session.ID) - streamCtx := context.Background() + streamCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer cancel() status, lastMessage, messages, streamErr := client.StreamSessionUntilIdle(streamCtx, session.ID) if streamErr != nil { ctx.Logger.Warnf("Stream failed for session %s: %v. Falling back to poll.", session.ID, streamErr) return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) } + // Update metadata with final status + metadata.Session.Status = status + _ = ctx.Metadata.Set(metadata) + // Clean up the session on Anthropic's side if err := client.DeleteManagedSession(session.ID); err != nil { ctx.Logger.Warnf("Failed to delete managed session %s: %v", session.ID, err) From 8bcbbea5b596268c084620b25326334b48e374aa Mon Sep 17 00:00:00 2001 From: Bender Rodriguez Date: Wed, 10 Jun 2026 13:30:37 +0000 Subject: [PATCH 04/10] fix: move streaming out of Execute() to avoid long DB transaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Execute() runs inside a DB transaction that locks the execution row. Blocking on SSE streaming for up to 30 minutes would exhaust DB connections and block other operations. Fix: Execute() now schedules a 'stream' action call that runs outside the transaction. The stream handler opens the SSE connection, captures messages, and emits output. Falls back to 'poll' if streaming fails. Flow: 1. Execute() — creates session, sends message, schedules 'stream' 2. stream() — opens SSE, captures messages, emits output 3. poll() — fallback if streaming fails (exponential backoff) --- pkg/integrations/claude/runagent/monitor.go | 51 ++++++++++++++++--- pkg/integrations/claude/runagent/run_agent.go | 29 ++--------- .../claude/runagent/run_agent_test.go | 19 ++----- 3 files changed, 54 insertions(+), 45 deletions(-) diff --git a/pkg/integrations/claude/runagent/monitor.go b/pkg/integrations/claude/runagent/monitor.go index 0b569e8298..c73d4ca80c 100644 --- a/pkg/integrations/claude/runagent/monitor.go +++ b/pkg/integrations/claude/runagent/monitor.go @@ -1,6 +1,7 @@ package runagent import ( + "context" "fmt" "net/http" "time" @@ -15,17 +16,55 @@ func (a *RunAgent) HandleWebhook(ctx core.WebhookRequestContext) (int, *core.Web } func (a *RunAgent) Hooks() []core.Hook { - return []core.Hook{{ - Name: "poll", - Type: core.HookTypeInternal, - }} + return []core.Hook{ + {Name: "poll", Type: core.HookTypeInternal}, + {Name: "stream", Type: core.HookTypeInternal}, + } } func (a *RunAgent) HandleHook(ctx core.ActionHookContext) error { - if ctx.Name == "poll" { + switch ctx.Name { + case "stream": + return a.stream(ctx) + case "poll": return a.poll(ctx) + default: + return fmt.Errorf("unknown hook: %s", ctx.Name) + } +} + +func (a *RunAgent) stream(ctx core.ActionHookContext) error { + metadata := ExecutionMetadata{} + if err := mapstructure.Decode(ctx.Metadata, &metadata); err != nil { + return fmt.Errorf("failed to decode metadata: %w", err) + } + if metadata.Session.ID == "" { + return fmt.Errorf("missing session id in metadata") } - return fmt.Errorf("unknown hook: %s", ctx.Name) + + client, err := NewClient(ctx.HTTP, ctx.Integration) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + + streamCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer cancel() + + status, lastMessage, messages, streamErr := client.StreamSessionUntilIdle(streamCtx, metadata.Session.ID) + if streamErr != nil { + ctx.Logger.Warnf("Stream failed for session %s: %v. Falling back to poll.", metadata.Session.ID, streamErr) + return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) + } + + metadata.Session.Status = status + _ = ctx.Metadata.Set(metadata) + + if err := client.DeleteManagedSession(metadata.Session.ID); err != nil { + ctx.Logger.Warnf("Failed to delete managed session %s: %v", metadata.Session.ID, err) + } + + out := buildOutput(status, metadata.Session.ID, lastMessage, messages) + return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) } func (a *RunAgent) poll(ctx core.ActionHookContext) error { diff --git a/pkg/integrations/claude/runagent/run_agent.go b/pkg/integrations/claude/runagent/run_agent.go index d8ab0f4289..c05edbd037 100644 --- a/pkg/integrations/claude/runagent/run_agent.go +++ b/pkg/integrations/claude/runagent/run_agent.go @@ -1,10 +1,8 @@ package runagent import ( - "context" "fmt" "strings" - "time" "github.com/google/uuid" "github.com/mitchellh/mapstructure" @@ -155,29 +153,10 @@ func (a *RunAgent) Execute(ctx core.ExecutionContext) error { return fmt.Errorf("failed to send user message: %w", err) } - // Stream session events until completion. - // This captures agent messages in real-time, avoiding the eventual - // consistency issue with the events list API. - ctx.Logger.Infof("Started Managed Agent session %s. Streaming events...", session.ID) - streamCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) - defer cancel() - status, lastMessage, messages, streamErr := client.StreamSessionUntilIdle(streamCtx, session.ID) - if streamErr != nil { - ctx.Logger.Warnf("Stream failed for session %s: %v. Falling back to poll.", session.ID, streamErr) - return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) - } - - // Update metadata with final status - metadata.Session.Status = status - _ = ctx.Metadata.Set(metadata) - - // Clean up the session on Anthropic's side - if err := client.DeleteManagedSession(session.ID); err != nil { - ctx.Logger.Warnf("Failed to delete managed session %s: %v", session.ID, err) - } - - out := buildOutput(status, session.ID, lastMessage, messages) - return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) + // Don't block Execute() — it runs inside a DB transaction. + // Schedule streaming as an action call that runs outside the transaction. + ctx.Logger.Infof("Started Managed Agent session %s. Scheduling stream...", session.ID) + return ctx.Requests.ScheduleActionCall("stream", map[string]any{"attempt": 1, "errors": 0}, 0) } func (a *RunAgent) Cleanup(ctx core.SetupContext) error { return nil } diff --git a/pkg/integrations/claude/runagent/run_agent_test.go b/pkg/integrations/claude/runagent/run_agent_test.go index 0ef0e81ef2..37d6851464 100644 --- a/pkg/integrations/claude/runagent/run_agent_test.go +++ b/pkg/integrations/claude/runagent/run_agent_test.go @@ -79,30 +79,22 @@ func Test__RunAgent__Execute__syncIdle(t *testing.T) { err := a.Execute(execCtx) require.NoError(t, err) - require.True(t, executionState.Finished) - assert.Equal(t, payloadType, executionState.Type) - assert.Equal(t, "idle", executionState.Payloads[0].(map[string]any)["data"].(OutputPayload).Status) - assert.Equal(t, "Done", executionState.Payloads[0].(map[string]any)["data"].(OutputPayload).LastMessage) - assert.Equal(t, "", requestsCtx.Action) + assert.Equal(t, "stream", requestsCtx.Action) + assert.False(t, executionState.Finished) - require.Len(t, httpContext.Requests, 4) + require.Len(t, httpContext.Requests, 2) assert.Equal(t, "POST", httpContext.Requests[0].Method) assert.Contains(t, httpContext.Requests[0].URL.Path, "/sessions") assert.Equal(t, anthropicBetaManagedAgents, httpContext.Requests[0].Header.Get("anthropic-beta")) assert.Contains(t, httpContext.Requests[1].URL.Path, "/events") - assert.Equal(t, "GET", httpContext.Requests[2].Method) - assert.Contains(t, httpContext.Requests[2].URL.Path, "/events/stream") - assert.Equal(t, "DELETE", httpContext.Requests[3].Method) - assert.Contains(t, httpContext.Requests[3].URL.Path, "/sessions/sess_1") } -func Test__RunAgent__Execute__schedulesPoll(t *testing.T) { +func Test__RunAgent__Execute__schedulesStream(t *testing.T) { a := &RunAgent{} httpContext := &contexts.HTTPContext{ Responses: []*http.Response{ {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"running"}`))}, {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{}`))}, - {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"running"}`))}, }, } integrationCtx := &contexts.IntegrationContext{Configuration: map[string]any{"apiKey": "k"}} @@ -124,8 +116,7 @@ func Test__RunAgent__Execute__schedulesPoll(t *testing.T) { err := a.Execute(execCtx) require.NoError(t, err) assert.False(t, executionState.Finished) - assert.Equal(t, "poll", requestsCtx.Action) - assert.Equal(t, initialPoll, requestsCtx.Duration) + assert.Equal(t, "stream", requestsCtx.Action) } func Test__RunAgent__poll__terminal(t *testing.T) { From 4665f40cc49f1f816d7bb24c5ec238ed474f13ac Mon Sep 17 00:00:00 2001 From: Bender Rodriguez Date: Wed, 10 Jun 2026 13:32:10 +0000 Subject: [PATCH 05/10] docs: regenerate component docs (messages field in runAgent output) --- docs/components/Claude.mdx | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/components/Claude.mdx b/docs/components/Claude.mdx index 8107672943..53ab32304c 100644 --- a/docs/components/Claude.mdx +++ b/docs/components/Claude.mdx @@ -47,6 +47,10 @@ Emits a finished payload with **session** status, **session id**, and the final { "data": { "lastMessage": "Finished the requested task.", + "messages": [ + "Let me check the documentation...", + "Finished the requested task." + ], "sessionId": "sess_01ExampleManagedSession", "status": "idle" }, From 93469a82f69521a36e7966befef98e0d09e36077 Mon Sep 17 00:00:00 2001 From: Bender Rodriguez Date: Wed, 10 Jun 2026 14:01:07 +0000 Subject: [PATCH 06/10] fix: use ctx.Metadata.Get() in stream hook to load persisted session --- pkg/integrations/claude/runagent/monitor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/integrations/claude/runagent/monitor.go b/pkg/integrations/claude/runagent/monitor.go index c73d4ca80c..8d9d8ad94c 100644 --- a/pkg/integrations/claude/runagent/monitor.go +++ b/pkg/integrations/claude/runagent/monitor.go @@ -35,7 +35,7 @@ func (a *RunAgent) HandleHook(ctx core.ActionHookContext) error { func (a *RunAgent) stream(ctx core.ActionHookContext) error { metadata := ExecutionMetadata{} - if err := mapstructure.Decode(ctx.Metadata, &metadata); err != nil { + if err := mapstructure.Decode(ctx.Metadata.Get(), &metadata); err != nil { return fmt.Errorf("failed to decode metadata: %w", err) } if metadata.Session.ID == "" { From 7091576c02d08b9fad5cac0af4e343d81c680a25 Mon Sep 17 00:00:00 2001 From: Bender Rodriguez Date: Wed, 10 Jun 2026 14:27:58 +0000 Subject: [PATCH 07/10] fix: backfill lastMessage from events API if stream missed it If streaming completes but captures no agent.message events, fall back to GetLastManagedSessionAgentMessageWithRetry to fetch the message from the events list API. --- pkg/integrations/claude/runagent/monitor.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/integrations/claude/runagent/monitor.go b/pkg/integrations/claude/runagent/monitor.go index 8d9d8ad94c..44cf7afd99 100644 --- a/pkg/integrations/claude/runagent/monitor.go +++ b/pkg/integrations/claude/runagent/monitor.go @@ -56,6 +56,19 @@ func (a *RunAgent) stream(ctx core.ActionHookContext) error { return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) } + // If the stream completed but didn't capture any agent messages, + // fall back to the events list API as a backfill. + if lastMessage == "" { + backfill, _, backfillErr := client.GetLastManagedSessionAgentMessageWithRetry(metadata.Session.ID, finalMessageReads, finalMessageDelay) + if backfillErr != nil { + ctx.Logger.Warnf("Backfill fetch failed for session %s: %v", metadata.Session.ID, backfillErr) + } + if backfill != "" { + lastMessage = backfill + messages = append(messages, backfill) + } + } + metadata.Session.Status = status _ = ctx.Metadata.Set(metadata) From 4dcf989217a44055cd91a57ec1615966e8fb123f Mon Sep 17 00:00:00 2001 From: Bender Rodriguez Date: Wed, 10 Jun 2026 14:38:48 +0000 Subject: [PATCH 08/10] fix: fall back to poll if stream hook fails to create client --- pkg/integrations/claude/runagent/monitor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/integrations/claude/runagent/monitor.go b/pkg/integrations/claude/runagent/monitor.go index 44cf7afd99..cae38c9ff1 100644 --- a/pkg/integrations/claude/runagent/monitor.go +++ b/pkg/integrations/claude/runagent/monitor.go @@ -44,7 +44,8 @@ func (a *RunAgent) stream(ctx core.ActionHookContext) error { client, err := NewClient(ctx.HTTP, ctx.Integration) if err != nil { - return fmt.Errorf("failed to create client: %w", err) + ctx.Logger.Warnf("Stream hook: failed to create client: %v. Falling back to poll.", err) + return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) } streamCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) From 94e7ee4b44f1bd9d1b3ea102d76435985cee8647 Mon Sep 17 00:00:00 2001 From: Bender Rodriguez Date: Wed, 10 Jun 2026 14:48:23 +0000 Subject: [PATCH 09/10] fix: populate messages array in poll path for consistent output --- pkg/integrations/claude/runagent/monitor.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/integrations/claude/runagent/monitor.go b/pkg/integrations/claude/runagent/monitor.go index cae38c9ff1..2b103691d2 100644 --- a/pkg/integrations/claude/runagent/monitor.go +++ b/pkg/integrations/claude/runagent/monitor.go @@ -143,7 +143,11 @@ func (a *RunAgent) poll(ctx core.ActionHookContext) error { if err == nil && lastMessage == "" { ctx.Logger.Warnf("No final agent message found for managed session %s. Event types: %s", metadata.Session.ID, managedSessionEventTypes(events)) } - out := buildOutput(sess.Status, metadata.Session.ID, lastMessage, nil) + var msgs []string + if lastMessage != "" { + msgs = []string{lastMessage} + } + out := buildOutput(sess.Status, metadata.Session.ID, lastMessage, msgs) return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) } From 0872c097a8a9c57b22485fc8c1e9f9410de3d4e2 Mon Sep 17 00:00:00 2001 From: Bender Rodriguez Date: Wed, 10 Jun 2026 14:56:34 +0000 Subject: [PATCH 10/10] fix: reorder stream hook to emit before delete, add finished guard Two issues fixed: 1. Poll skips after stream metadata (High): The stream hook was deleting the session and writing terminal metadata BEFORE emit. If emit failed and fell back to poll, poll couldn't fetch the deleted session. Fix: emit first, then update metadata and delete session. If emit fails, session is still accessible for the poll fallback. 2. Stream hook missing finished guard (Medium): Added IsFinished() check at entry to prevent duplicate stream actions from opening unnecessary SSE connections. --- pkg/integrations/claude/runagent/monitor.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/integrations/claude/runagent/monitor.go b/pkg/integrations/claude/runagent/monitor.go index 2b103691d2..537e16b94d 100644 --- a/pkg/integrations/claude/runagent/monitor.go +++ b/pkg/integrations/claude/runagent/monitor.go @@ -34,6 +34,11 @@ func (a *RunAgent) HandleHook(ctx core.ActionHookContext) error { } func (a *RunAgent) stream(ctx core.ActionHookContext) error { + // Guard against duplicate/retried stream actions + if ctx.ExecutionState.IsFinished() { + return nil + } + metadata := ExecutionMetadata{} if err := mapstructure.Decode(ctx.Metadata.Get(), &metadata); err != nil { return fmt.Errorf("failed to decode metadata: %w", err) @@ -70,15 +75,23 @@ func (a *RunAgent) stream(ctx core.ActionHookContext) error { } } + // Emit output BEFORE updating metadata or deleting session. + // If emit fails, the poll fallback can still access the session. + out := buildOutput(status, metadata.Session.ID, lastMessage, messages) + if err := ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}); err != nil { + ctx.Logger.Warnf("Emit failed for session %s: %v. Falling back to poll.", metadata.Session.ID, err) + return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) + } + metadata.Session.Status = status _ = ctx.Metadata.Set(metadata) + // Clean up session after successful emit if err := client.DeleteManagedSession(metadata.Session.ID); err != nil { ctx.Logger.Warnf("Failed to delete managed session %s: %v", metadata.Session.ID, err) } - out := buildOutput(status, metadata.Session.ID, lastMessage, messages) - return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) + return nil } func (a *RunAgent) poll(ctx core.ActionHookContext) error {