feat: switch runAgent from polling to SSE streaming#5280
feat: switch runAgent from polling to SSE streaming#5280bender-rodriguez-unit1 wants to merge 11 commits into
Conversation
Replace the poll-based completion detection with real-time SSE
streaming via GET /sessions/{id}/events/stream.
How it works:
- After creating session and sending the user message, Execute()
opens an SSE stream and processes events in real-time
- agent.message events are captured as they arrive, always keeping
the latest one
- When session.status_idle or session.status_terminated arrives,
emit the output with the captured last message
- If streaming fails, fall back to the poll-based monitor
This eliminates the eventual consistency issue where the events
list API doesn't have agent.message events immediately after the
session goes idle. In the stream, events arrive in chronological
order, so we always capture all messages before the terminal event.
Fixes #5278
|
👋 Commands for maintainers:
|
|
❌ OSS Guard found dependency licenses that are not permitted for this project. Project license (from repository): Apache-2.0 Permitted dependency licenses: MIT,Apache-2.0,BSD-2-Clause,BSD-3-Clause,ISC,0BSD,Unlicense,CC0-1.0,CC-BY-4.0,Zlib,MPL-2.0,OpenSSL,BlueOak-1.0.0 Reason: One or more dependencies use licenses that are not compatible with the project license. osv-scanner report: Add approved exceptions in your repository's |
- Output payload now includes 'messages' array with all agent
messages from the session (intermediate + final)
- 'lastMessage' still has just the final message for convenience
- Session is deleted via DELETE /sessions/{id} after completion
to clean up resources on Anthropic's side
- Updated tests and example output
|
❌ OSS Guard found dependency licenses that are not permitted for this project. Project license (from repository): Apache-2.0 Permitted dependency licenses: MIT,Apache-2.0,BSD-2-Clause,BSD-3-Clause,ISC,0BSD,Unlicense,CC0-1.0,CC-BY-4.0,Zlib,MPL-2.0,OpenSSL,BlueOak-1.0.0 Reason: One or more dependencies use licenses that are not compatible with the project license. osv-scanner report: Add approved exceptions in your repository's |
1. Stream uses cancellable context with 30min timeout instead of context.Background() — allows cleanup on cancellation 2. Update execution metadata with final status after stream completes — no longer shows stale 'running' state 3. HTTP timeout: streaming uses c.http.Do (same as other calls). If the 30s client timeout cuts the stream, falls back to polling. A dedicated streaming HTTP client would be a bigger infrastructure change tracked separately.
|
❌ OSS Guard found dependency licenses that are not permitted for this project. Project license (from repository): Apache-2.0 Permitted dependency licenses: MIT,Apache-2.0,BSD-2-Clause,BSD-3-Clause,ISC,0BSD,Unlicense,CC0-1.0,CC-BY-4.0,Zlib,MPL-2.0,OpenSSL,BlueOak-1.0.0 Reason: One or more dependencies use licenses that are not compatible with the project license. osv-scanner report: Add approved exceptions in your repository's |
Execute() runs inside a DB transaction that locks the execution row. Blocking on SSE streaming for up to 30 minutes would exhaust DB connections and block other operations. Fix: Execute() now schedules a 'stream' action call that runs outside the transaction. The stream handler opens the SSE connection, captures messages, and emits output. Falls back to 'poll' if streaming fails. Flow: 1. Execute() — creates session, sends message, schedules 'stream' 2. stream() — opens SSE, captures messages, emits output 3. poll() — fallback if streaming fails (exponential backoff)
|
❌ OSS Guard found dependency licenses that are not permitted for this project. Project license (from repository): Apache-2.0 Permitted dependency licenses: MIT,Apache-2.0,BSD-2-Clause,BSD-3-Clause,ISC,0BSD,Unlicense,CC0-1.0,CC-BY-4.0,Zlib,MPL-2.0,OpenSSL,BlueOak-1.0.0 Reason: One or more dependencies use licenses that are not compatible with the project license. osv-scanner report: Add approved exceptions in your repository's |
|
❌ OSS Guard found dependency licenses that are not permitted for this project. Project license (from repository): Apache-2.0 Permitted dependency licenses: MIT,Apache-2.0,BSD-2-Clause,BSD-3-Clause,ISC,0BSD,Unlicense,CC0-1.0,CC-BY-4.0,Zlib,MPL-2.0,OpenSSL,BlueOak-1.0.0 Reason: One or more dependencies use licenses that are not compatible with the project license. osv-scanner report: Add approved exceptions in your repository's |
|
❌ OSS Guard found dependency licenses that are not permitted for this project. Project license (from repository): Apache-2.0 Permitted dependency licenses: MIT,Apache-2.0,BSD-2-Clause,BSD-3-Clause,ISC,0BSD,Unlicense,CC0-1.0,CC-BY-4.0,Zlib,MPL-2.0,OpenSSL,BlueOak-1.0.0 Reason: One or more dependencies use licenses that are not compatible with the project license. osv-scanner report: Add approved exceptions in your repository's |
If streaming completes but captures no agent.message events, fall back to GetLastManagedSessionAgentMessageWithRetry to fetch the message from the events list API.
Two issues fixed: 1. Poll skips after stream metadata (High): The stream hook was deleting the session and writing terminal metadata BEFORE emit. If emit failed and fell back to poll, poll couldn't fetch the deleted session. Fix: emit first, then update metadata and delete session. If emit fails, session is still accessible for the poll fallback. 2. Stream hook missing finished guard (Medium): Added IsFinished() check at entry to prevent duplicate stream actions from opening unnecessary SSE connections.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 0872c09. Configure here.
| // Don't block Execute() — it runs inside a DB transaction. | ||
| // Schedule streaming as an action call that runs outside the transaction. | ||
| ctx.Logger.Infof("Started Managed Agent session %s. Scheduling stream...", session.ID) | ||
| return ctx.Requests.ScheduleActionCall("stream", map[string]any{"attempt": 1, "errors": 0}, 0) |
There was a problem hiding this comment.
Zero stream schedule interval
High Severity
Execute schedules the stream hook with a 0 delay, but production ExecutionRequestContext.ScheduleActionCall rejects any interval under one second. The action errors after the session is created and the user message is sent, so the stream never runs and the run fails.
Reviewed by Cursor Bugbot for commit 0872c09. Configure here.
| streamCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) | ||
| defer cancel() | ||
|
|
||
| status, lastMessage, messages, streamErr := client.StreamSessionUntilIdle(streamCtx, metadata.Session.ID) |
There was a problem hiding this comment.
Stream holds DB transaction
High Severity
The new stream hook blocks on StreamSessionUntilIdle for up to thirty minutes while the node request worker still holds an open Postgres transaction. Default idle_in_transaction_session_timeout is thirty seconds, so long-lived agent runs are likely to abort mid-stream instead of completing via SSE.
Reviewed by Cursor Bugbot for commit 0872c09. Configure here.
…#5292) ## Problem The `claude.runAgent` component returns empty or intermediate `lastMessage` because the events list API is eventually consistent (#5278). The session goes `idle` before `agent.message` events are queryable. ## Root cause The previous approach: poll session status → idle → fetch events → hope agent.message is there. But events are written asynchronously and may not be queryable yet. ## Solution **Check for event completeness, not just message presence.** When `session.status_idle` appears IN the events list (not just the session status endpoint), ALL events including ALL `agent.message` events are guaranteed to be present. ### New approach: 1. Poll session status → `idle` 2. Fetch events, check if `session.status_idle` is in the events list 3. If yes → events are complete, extract all messages 4. If no → retry (events still being written) ### Changes: - `GetSessionMessages()` — fetches all events, checks for terminal event, collects all agent messages in chronological order - `GetSessionMessagesWithRetry()` — polls until `Complete == true` - `SessionMessages` struct — `Messages []string`, `LastMessage string`, `Complete bool` - Output includes `messages` array alongside `lastMessage` - Session deleted after successful emit - Updated tests and example output Fixes #5278 Replaces #5280 (streaming approach was scrapped — team decided polling is the right pattern) --------- Co-authored-by: Bender Rodriguez <bender@superplanehq.com>


Problem
The
claude.runAgentcomponent returns empty or intermediatelastMessagebecause Anthropic's events list API is eventually consistent —agent.messageevents are not queryable immediately aftersession.status_idle(#5278).Solution
Replace poll-based completion detection with real-time SSE streaming via
GET /sessions/{id}/events/stream.How it works
/sessions/{id}/events/streamagent.message→ capture text (always keep latest)session.status_idle→ done, emit output with captured messagesession.status_terminated→ doneWhy this works
In the SSE stream, events arrive in chronological order. The
agent.messagealways appears BEFOREsession.status_idle, so we always have the final message when we emit output. No eventual consistency gap.Changes
client.go: NewStreamSessionUntilIdle()method with SSE parsingrun_agent.go:Execute()uses streaming instead of status check + poll schedulingFixes #5278