diff --git a/docs/components/Claude.mdx b/docs/components/Claude.mdx index 8107672943..53ab32304c 100644 --- a/docs/components/Claude.mdx +++ b/docs/components/Claude.mdx @@ -47,6 +47,10 @@ Emits a finished payload with **session** status, **session id**, and the final { "data": { "lastMessage": "Finished the requested task.", + "messages": [ + "Let me check the documentation...", + "Finished the requested task." + ], "sessionId": "sess_01ExampleManagedSession", "status": "idle" }, diff --git a/pkg/integrations/claude/runagent/client.go b/pkg/integrations/claude/runagent/client.go index a7cf6cca18..7a5f09feea 100644 --- a/pkg/integrations/claude/runagent/client.go +++ b/pkg/integrations/claude/runagent/client.go @@ -1,7 +1,9 @@ package runagent import ( + "bufio" "bytes" + "context" "encoding/json" "fmt" "io" @@ -183,6 +185,15 @@ func (c *Client) GetManagedSession(sessionID string) (*ManagedSession, error) { return &out, nil } +func (c *Client) DeleteManagedSession(sessionID string) error { + if sessionID == "" { + return fmt.Errorf("session id is required") + } + URL := c.BaseURL + "/sessions/" + url.PathEscape(sessionID) + _, err := c.execRequestWithBeta(http.MethodDelete, URL, nil, anthropicBetaManagedAgents) + return err +} + func (c *Client) listManagedSessionEventsPage(sessionID, page string) ([]ManagedSessionEvent, string, error) { if sessionID == "" { return nil, "", fmt.Errorf("session id is required") @@ -208,6 +219,91 @@ func (c *Client) listManagedSessionEventsPage(sessionID, page string) ([]Managed return out.Data, out.NextPage, nil } +// StreamSessionUntilIdle opens an SSE stream on the session and processes +// events in real-time. It captures all agent.message texts and returns +// the terminal status, last message, and all messages when the session completes. +func (c *Client) StreamSessionUntilIdle(ctx context.Context, sessionID string) (status string, lastMessage string, messages []string, err error) { + if sessionID == "" { + return "", "", nil, fmt.Errorf("session id is required") + } + + streamURL := c.BaseURL + "/sessions/" + url.PathEscape(sessionID) + "/events/stream" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil) + if err != nil { + return "", "", nil, fmt.Errorf("build stream request: %w", err) + } + req.Header.Set("x-api-key", c.APIKey) + req.Header.Set("anthropic-version", anthropicVersionValue) + req.Header.Set("anthropic-beta", anthropicBetaManagedAgents) + req.Header.Set("Accept", "text/event-stream") + + resp, err := c.http.Do(req) + if err != nil { + return "", "", nil, fmt.Errorf("open stream: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + body, _ := io.ReadAll(resp.Body) + return "", "", nil, fmt.Errorf("stream request failed (%d): %s", resp.StatusCode, string(body)) + } + + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + + for scanner.Scan() { + if ctx.Err() != nil { + return "", lastMessage, messages, ctx.Err() + } + + line := scanner.Text() + if !strings.HasPrefix(line, "data: ") { + continue + } + payload := strings.TrimPrefix(line, "data: ") + if payload == "" { + continue + } + + var event ManagedSessionEvent + if err := json.Unmarshal([]byte(payload), &event); err != nil { + continue + } + + // Capture all agent messages + if event.Type == "agent.message" || event.Type == "assistant.message" { + text := extractTextFromContent(event.Content) + if text != "" { + messages = append(messages, text) + lastMessage = text + } + } + + // Terminal events + if event.Type == "session.status_idle" { + return "idle", lastMessage, messages, nil + } + if event.Type == "session.status_terminated" { + return "terminated", lastMessage, messages, nil + } + } + + if err := scanner.Err(); err != nil { + return "", lastMessage, messages, fmt.Errorf("stream read: %w", err) + } + return "", lastMessage, messages, fmt.Errorf("stream ended unexpectedly") +} + +func extractTextFromContent(content []ManagedSessionContentBlock) string { + parts := make([]string, 0) + for _, block := range content { + if block.Type == "text" && strings.TrimSpace(block.Text) != "" { + parts = append(parts, block.Text) + } + } + return strings.Join(parts, "\n") +} + func (c *Client) GetLastManagedSessionAgentMessage(sessionID string) (string, []ManagedSessionEvent, error) { seen := []ManagedSessionEvent{} page := "" @@ -325,17 +421,6 @@ func (c *Client) SendManagedSessionInterrupt(sessionID string) error { return err } -// DeleteManagedSession removes a session (DELETE /v1/sessions/{id}). -// The API does not allow deleting a running session without interrupting first. -func (c *Client) DeleteManagedSession(sessionID string) error { - if sessionID == "" { - return fmt.Errorf("session id is required") - } - URL := c.BaseURL + "/sessions/" + url.PathEscape(sessionID) - _, err := c.execRequestWithBeta(http.MethodDelete, URL, nil, anthropicBetaManagedAgents) - return err -} - func (c *Client) execRequestWithBeta(method, URL string, body io.Reader, beta string) ([]byte, error) { req, err := http.NewRequest(method, URL, body) if err != nil { diff --git a/pkg/integrations/claude/runagent/example_output.json b/pkg/integrations/claude/runagent/example_output.json index 85fc479345..ba605d5a6a 100644 --- a/pkg/integrations/claude/runagent/example_output.json +++ b/pkg/integrations/claude/runagent/example_output.json @@ -2,7 +2,11 @@ "data": { "status": "idle", "sessionId": "sess_01ExampleManagedSession", - "lastMessage": "Finished the requested task." + "lastMessage": "Finished the requested task.", + "messages": [ + "Let me check the documentation...", + "Finished the requested task." + ] }, "timestamp": "2026-04-26T12:00:00Z", "type": "claude.runAgent.finished" diff --git a/pkg/integrations/claude/runagent/monitor.go b/pkg/integrations/claude/runagent/monitor.go index a387cf6908..537e16b94d 100644 --- a/pkg/integrations/claude/runagent/monitor.go +++ b/pkg/integrations/claude/runagent/monitor.go @@ -1,6 +1,7 @@ package runagent import ( + "context" "fmt" "net/http" "time" @@ -15,17 +16,82 @@ func (a *RunAgent) HandleWebhook(ctx core.WebhookRequestContext) (int, *core.Web } func (a *RunAgent) Hooks() []core.Hook { - return []core.Hook{{ - Name: "poll", - Type: core.HookTypeInternal, - }} + return []core.Hook{ + {Name: "poll", Type: core.HookTypeInternal}, + {Name: "stream", Type: core.HookTypeInternal}, + } } func (a *RunAgent) HandleHook(ctx core.ActionHookContext) error { - if ctx.Name == "poll" { + switch ctx.Name { + case "stream": + return a.stream(ctx) + case "poll": return a.poll(ctx) + default: + return fmt.Errorf("unknown hook: %s", ctx.Name) + } +} + +func (a *RunAgent) stream(ctx core.ActionHookContext) error { + // Guard against duplicate/retried stream actions + if ctx.ExecutionState.IsFinished() { + return nil + } + + metadata := ExecutionMetadata{} + if err := mapstructure.Decode(ctx.Metadata.Get(), &metadata); err != nil { + return fmt.Errorf("failed to decode metadata: %w", err) + } + if metadata.Session.ID == "" { + return fmt.Errorf("missing session id in metadata") + } + + client, err := NewClient(ctx.HTTP, ctx.Integration) + if err != nil { + ctx.Logger.Warnf("Stream hook: failed to create client: %v. Falling back to poll.", err) + return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) } - return fmt.Errorf("unknown hook: %s", ctx.Name) + + streamCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer cancel() + + status, lastMessage, messages, streamErr := client.StreamSessionUntilIdle(streamCtx, metadata.Session.ID) + if streamErr != nil { + ctx.Logger.Warnf("Stream failed for session %s: %v. Falling back to poll.", metadata.Session.ID, streamErr) + return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) + } + + // If the stream completed but didn't capture any agent messages, + // fall back to the events list API as a backfill. + if lastMessage == "" { + backfill, _, backfillErr := client.GetLastManagedSessionAgentMessageWithRetry(metadata.Session.ID, finalMessageReads, finalMessageDelay) + if backfillErr != nil { + ctx.Logger.Warnf("Backfill fetch failed for session %s: %v", metadata.Session.ID, backfillErr) + } + if backfill != "" { + lastMessage = backfill + messages = append(messages, backfill) + } + } + + // Emit output BEFORE updating metadata or deleting session. + // If emit fails, the poll fallback can still access the session. + out := buildOutput(status, metadata.Session.ID, lastMessage, messages) + if err := ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}); err != nil { + ctx.Logger.Warnf("Emit failed for session %s: %v. Falling back to poll.", metadata.Session.ID, err) + return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) + } + + metadata.Session.Status = status + _ = ctx.Metadata.Set(metadata) + + // Clean up session after successful emit + if err := client.DeleteManagedSession(metadata.Session.ID); err != nil { + ctx.Logger.Warnf("Failed to delete managed session %s: %v", metadata.Session.ID, err) + } + + return nil } func (a *RunAgent) poll(ctx core.ActionHookContext) error { @@ -56,7 +122,7 @@ func (a *RunAgent) poll(ctx core.ActionHookContext) error { if attempt > maxPollAttempts { ctx.Logger.Errorf("Managed session %s exceeded max poll attempts", metadata.Session.ID) - out := buildOutput("timeout", metadata.Session.ID) + out := buildOutput("timeout", metadata.Session.ID, "", nil) return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) } @@ -70,7 +136,7 @@ func (a *RunAgent) poll(ctx core.ActionHookContext) error { errs++ if errs >= maxPollErrors { ctx.Logger.Errorf("Managed session %s: polling failed repeatedly: %v", metadata.Session.ID, err) - out := buildOutput("error", metadata.Session.ID) + out := buildOutput("error", metadata.Session.ID, "", nil) return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) } return a.scheduleNextPoll(ctx, attempt+1, errs) @@ -90,7 +156,11 @@ func (a *RunAgent) poll(ctx core.ActionHookContext) error { if err == nil && lastMessage == "" { ctx.Logger.Warnf("No final agent message found for managed session %s. Event types: %s", metadata.Session.ID, managedSessionEventTypes(events)) } - out := buildOutput(sess.Status, metadata.Session.ID, lastMessage) + var msgs []string + if lastMessage != "" { + msgs = []string{lastMessage} + } + out := buildOutput(sess.Status, metadata.Session.ID, lastMessage, msgs) return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) } diff --git a/pkg/integrations/claude/runagent/run_agent.go b/pkg/integrations/claude/runagent/run_agent.go index a0da50b805..c05edbd037 100644 --- a/pkg/integrations/claude/runagent/run_agent.go +++ b/pkg/integrations/claude/runagent/run_agent.go @@ -153,28 +153,10 @@ func (a *RunAgent) Execute(ctx core.ExecutionContext) error { return fmt.Errorf("failed to send user message: %w", err) } - // Refresh status after work may have already progressed. - refreshed, err := client.GetManagedSession(session.ID) - if err != nil { - return fmt.Errorf("failed to get session: %w", err) - } - mergeSessionIntoMetadata(&metadata, refreshed) - _ = ctx.Metadata.Set(metadata) - - if refreshed != nil && isSessionTerminal(refreshed.Status) { - lastMessage, events, err := client.GetLastManagedSessionAgentMessageWithRetry(session.ID, finalMessageReads, finalMessageDelay) - if err != nil { - ctx.Logger.Warnf("Failed to fetch final message for managed session %s: %v", session.ID, err) - } - if err == nil && lastMessage == "" { - ctx.Logger.Warnf("No final agent message found for managed session %s. Event types: %s", session.ID, managedSessionEventTypes(events)) - } - out := buildOutput(refreshed.Status, session.ID, lastMessage) - return ctx.ExecutionState.Emit(defaultChannel, payloadType, []any{out}) - } - - ctx.Logger.Infof("Started Managed Agent session %s. Waiting for completion (polling)...", session.ID) - return ctx.Requests.ScheduleActionCall("poll", map[string]any{"attempt": 1, "errors": 0}, initialPoll) + // Don't block Execute() — it runs inside a DB transaction. + // Schedule streaming as an action call that runs outside the transaction. + ctx.Logger.Infof("Started Managed Agent session %s. Scheduling stream...", session.ID) + return ctx.Requests.ScheduleActionCall("stream", map[string]any{"attempt": 1, "errors": 0}, 0) } func (a *RunAgent) Cleanup(ctx core.SetupContext) error { return nil } diff --git a/pkg/integrations/claude/runagent/run_agent_test.go b/pkg/integrations/claude/runagent/run_agent_test.go index 780aeb358a..37d6851464 100644 --- a/pkg/integrations/claude/runagent/run_agent_test.go +++ b/pkg/integrations/claude/runagent/run_agent_test.go @@ -45,12 +45,18 @@ func Test__RunAgent__Setup__validation(t *testing.T) { func Test__RunAgent__Execute__syncIdle(t *testing.T) { a := &RunAgent{} + + // SSE stream that sends an agent message then goes idle + sseStream := "data: {\"type\":\"session.status_running\"}\n\n" + + "data: {\"type\":\"agent.message\",\"content\":[{\"type\":\"text\",\"text\":\"Done\"}]}\n\n" + + "data: {\"type\":\"session.status_idle\"}\n\n" + httpContext := &contexts.HTTPContext{ Responses: []*http.Response{ {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"running"}`))}, {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{}`))}, - {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"idle"}`))}, - {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"data":[{"type":"user.message","content":[{"type":"text","text":"Hello"}]},{"type":"agent.message","content":[{"type":"text","text":"Done"}]}],"next_page":null}`))}, + {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(sseStream))}, + {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{}`))}, }, } integrationCtx := &contexts.IntegrationContext{ @@ -73,31 +79,22 @@ func Test__RunAgent__Execute__syncIdle(t *testing.T) { err := a.Execute(execCtx) require.NoError(t, err) - require.True(t, executionState.Finished) - assert.Equal(t, payloadType, executionState.Type) - assert.Equal(t, "idle", executionState.Payloads[0].(map[string]any)["data"].(OutputPayload).Status) - assert.Equal(t, "Done", executionState.Payloads[0].(map[string]any)["data"].(OutputPayload).LastMessage) - assert.Equal(t, "", requestsCtx.Action) + assert.Equal(t, "stream", requestsCtx.Action) + assert.False(t, executionState.Finished) - require.Len(t, httpContext.Requests, 4) + require.Len(t, httpContext.Requests, 2) assert.Equal(t, "POST", httpContext.Requests[0].Method) assert.Contains(t, httpContext.Requests[0].URL.Path, "/sessions") assert.Equal(t, anthropicBetaManagedAgents, httpContext.Requests[0].Header.Get("anthropic-beta")) assert.Contains(t, httpContext.Requests[1].URL.Path, "/events") - assert.Equal(t, "GET", httpContext.Requests[2].Method) - assert.Equal(t, "GET", httpContext.Requests[3].Method) - assert.Contains(t, httpContext.Requests[3].URL.Path, "/events") - assert.Equal(t, "desc", httpContext.Requests[3].URL.Query().Get("order")) - assert.Equal(t, sessionEventsPageLimit, httpContext.Requests[3].URL.Query().Get("limit")) } -func Test__RunAgent__Execute__schedulesPoll(t *testing.T) { +func Test__RunAgent__Execute__schedulesStream(t *testing.T) { a := &RunAgent{} httpContext := &contexts.HTTPContext{ Responses: []*http.Response{ {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"running"}`))}, {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{}`))}, - {StatusCode: http.StatusOK, Body: io.NopCloser(strings.NewReader(`{"id":"sess_1","status":"running"}`))}, }, } integrationCtx := &contexts.IntegrationContext{Configuration: map[string]any{"apiKey": "k"}} @@ -119,8 +116,7 @@ func Test__RunAgent__Execute__schedulesPoll(t *testing.T) { err := a.Execute(execCtx) require.NoError(t, err) assert.False(t, executionState.Finished) - assert.Equal(t, "poll", requestsCtx.Action) - assert.Equal(t, initialPoll, requestsCtx.Duration) + assert.Equal(t, "stream", requestsCtx.Action) } func Test__RunAgent__poll__terminal(t *testing.T) { diff --git a/pkg/integrations/claude/runagent/types.go b/pkg/integrations/claude/runagent/types.go index ecb1796325..33aafc2801 100644 --- a/pkg/integrations/claude/runagent/types.go +++ b/pkg/integrations/claude/runagent/types.go @@ -38,24 +38,23 @@ type SessionMetadata struct { // OutputPayload is emitted on the default channel when the run completes. type OutputPayload struct { - Status string `json:"status"` - SessionID string `json:"sessionId"` - LastMessage string `json:"lastMessage"` + Status string `json:"status"` + SessionID string `json:"sessionId"` + LastMessage string `json:"lastMessage"` + Messages []string `json:"messages"` } func isSessionTerminal(status string) bool { return status == sessionStatusIdle || status == sessionStatusTerminated } -func buildOutput(status, sessionID string, lastMessage ...string) OutputPayload { - out := OutputPayload{ - Status: status, - SessionID: sessionID, +func buildOutput(status, sessionID string, lastMessage string, messages []string) OutputPayload { + return OutputPayload{ + Status: status, + SessionID: sessionID, + LastMessage: lastMessage, + Messages: messages, } - if len(lastMessage) > 0 { - out.LastMessage = lastMessage[0] - } - return out } func mergeSessionIntoMetadata(metadata *ExecutionMetadata, s *ManagedSession) {