diff --git a/apps/atlasd/routes/sessions/index.ts b/apps/atlasd/routes/sessions/index.ts index 317cd4fb6..f490ac06b 100644 --- a/apps/atlasd/routes/sessions/index.ts +++ b/apps/atlasd/routes/sessions/index.ts @@ -11,7 +11,16 @@ import { daemonFactory } from "../../src/factory.ts"; import { publishSessionCancel } from "../../src/session-dispatch-registry.ts"; import { getAccessibleWorkspaceIds, requireWorkspaceMember } from "../../src/workspace-authz.ts"; -const ListQuery = z.object({ workspaceId: z.string().optional() }); +const ListQuery = z.object({ + workspaceId: z.string().optional(), + /** + * Filter to sessions whose `SessionSummary.correlationId` matches. + * Lets a `?nowait=true` signal-trigger caller redeem the 202's + * correlationId for the spawned sessionId without polling for a + * generic "newest non-snapshot" guess. + */ + correlationId: z.string().optional(), +}); /** * Build a SessionSummary from a SessionView (for active sessions @@ -30,6 +39,7 @@ function viewToSummary(view: SessionView): SessionSummary { stepCount: view.agentBlocks.length, agentNames: view.agentBlocks.map((b) => b.agentName), aiSummary: view.aiSummary, + ...(view.correlationId && { correlationId: view.correlationId }), }; } @@ -45,7 +55,7 @@ const sessionsRoutes = daemonFactory const userId = c.get("userId"); if (!userId) return c.json({ error: "Unauthorized" }, 401); - const { workspaceId } = c.req.valid("query"); + const { workspaceId, correlationId } = c.req.valid("query"); if (workspaceId !== undefined) { await requireWorkspaceMember(c, workspaceId); } @@ -70,10 +80,20 @@ const sessionsRoutes = daemonFactory } // Merge and sort by startedAt descending - const sessions = [...activeSummaries, ...completedSummaries].sort( + let sessions = [...activeSummaries, ...completedSummaries].sort( (a, b) => new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime(), ); + // correlationId filter — applied after the workspace filter so a + // nowait caller can look up its spawned session without knowing + // the workspaceId (though typically it does). Returns at most one + // session in practice (correlationId is per-signal-trigger), but + // we don't enforce that here — callers should treat results as + // "all sessions tagged with this correlationId". + if (correlationId !== undefined) { + sessions = sessions.filter((s) => s.correlationId === correlationId); + } + return c.json({ sessions }, 200); }) /** diff --git a/apps/atlasd/src/atlas-daemon.ts b/apps/atlasd/src/atlas-daemon.ts index 03b0db6e9..714f720af 100644 --- a/apps/atlasd/src/atlas-daemon.ts +++ b/apps/atlasd/src/atlas-daemon.ts @@ -822,6 +822,12 @@ export class AtlasDaemon { // consumed — Phase 11 makes it carry across into // `SessionSummary.parentSessionId`. parentSessionId: envelope.sourceSessionId, + // Carry the HTTP trigger's correlationId through to the + // spawned session so a `?nowait=true` caller can redeem the + // 202's correlationId for this sessionId via + // `GET /api/sessions?correlationId=…`. Absent on + // non-correlated envelopes (cron, fs-watch). + correlationId: envelope.correlationId, webhookContext: envelopeToWebhookContext(envelope), }, ); diff --git a/packages/core/src/session/session-events.ts b/packages/core/src/session/session-events.ts index a7af2365e..f3ddcd92c 100644 --- a/packages/core/src/session/session-events.ts +++ b/packages/core/src/session/session-events.ts @@ -60,6 +60,14 @@ export const SessionStartEventSchema = z.object({ * inflight-marker writer to let listers filter without a join. */ signalId: z.string().optional(), + /** + * Correlation id from the HTTP envelope that triggered this session. + * Carried so the SessionView reducer can surface it on active + * sessions (the durable `SessionSummary.correlationId` only exists + * after finalization). Optional — absent for non-HTTP triggers or + * historical events that predate the field. + */ + correlationId: z.string().optional(), plannedSteps: z .array( z.object({ @@ -255,6 +263,14 @@ export const SessionViewSchema = z.object({ error: z.string().optional(), /** AI-generated summary produced at session finalization */ aiSummary: SessionAISummarySchema.optional(), + /** + * Correlation id from the HTTP envelope that triggered this session. + * Captured from the `session:start` event by the reducer. Lets the + * list endpoint surface correlationId for *active* sessions too — + * the durable `SessionSummary.correlationId` only exists post- + * finalization. + */ + correlationId: z.string().optional(), }); export type SessionView = z.infer; @@ -290,5 +306,16 @@ export const SessionSummarySchema = z.object({ * pure session-level linkage works without it. */ parentEventId: z.string().optional(), + /** + * Correlation id assigned by the signal trigger that spawned this + * session. Carried on the cascade envelope, threaded through to the + * session at creation. Lets a `?nowait=true` HTTP caller redeem the + * correlationId from the 202 response for the spawned sessionId via + * `GET /api/sessions?correlationId=…`, without polling for a generic + * "newest non-snapshot" guess. Absent for sessions spawned without a + * correlated envelope (chat-tools that call `bypassConcurrency`, + * cron signals, fs-watch signals). + */ + correlationId: z.string().optional(), }); export type SessionSummary = z.infer; diff --git a/packages/core/src/session/session-reducer.ts b/packages/core/src/session/session-reducer.ts index 5f406121e..ba0813856 100644 --- a/packages/core/src/session/session-reducer.ts +++ b/packages/core/src/session/session-reducer.ts @@ -55,6 +55,7 @@ export function reduceSessionEvent( task: event.task, status: "active", startedAt: event.timestamp, + ...(event.correlationId && { correlationId: event.correlationId }), agentBlocks: event.plannedSteps?.map((step) => ({ stepNumber: undefined, diff --git a/packages/workspace/src/runtime.ts b/packages/workspace/src/runtime.ts index eb60ec798..75adcdf46 100644 --- a/packages/workspace/src/runtime.ts +++ b/packages/workspace/src/runtime.ts @@ -242,6 +242,15 @@ interface WorkspaceRuntimeSignal { * provenance for crystallization. Absent for root sessions. */ parentSessionId?: string; + /** + * Correlation id assigned by the HTTP signal trigger that produced + * this envelope. Forwarded to `SessionSummary.correlationId` at + * finalization so a `?nowait=true` caller can redeem its 202's + * correlationId for the spawned sessionId via + * `GET /api/sessions?correlationId=…`. Absent for non-correlated + * triggers (cron, fs-watch, bypassConcurrency chat tools). + */ + correlationId?: string; /** * Base64-encoded original webhook request body. Only set for signals * fired through Friday's webhook-tunnel (the byte-for-byte HTTP proxy). @@ -277,6 +286,13 @@ export interface TriggerSignalOpts { * session records its parent. Phase 11 provenance. */ parentSessionId?: string; + /** + * Correlation id from the HTTP envelope that triggered this signal. + * Threads through to `SessionSummary.correlationId` so the spawned + * session is discoverable from the 202's `{ correlationId }` via + * `GET /api/sessions?correlationId=…`. Absent for non-HTTP triggers. + */ + correlationId?: string; /** * Webhook-only context (base64 body + lowercased headers) preserved * byte-for-byte from the upstream HTTP request when the signal was @@ -2049,6 +2065,7 @@ export class WorkspaceRuntime { task: typeof signal.data?.task === "string" ? signal.data.task : job.name, plannedSteps, timestamp: session.startedAt.toISOString(), + ...(signal.correlationId && { correlationId: signal.correlationId }), }); // Emit session-start to the client's SSE stream so the UI can display @@ -2429,6 +2446,10 @@ export class WorkspaceRuntime { // field on the wire — keeps existing SESSION_METADATA entries // and round-trip schema parses unchanged. ...(signal.parentSessionId && { parentSessionId: signal.parentSessionId }), + // Correlation id from the HTTP envelope. Lets nowait callers + // redeem their 202's correlationId for this sessionId via + // GET /api/sessions?correlationId=… + ...(signal.correlationId && { correlationId: signal.correlationId }), }; await sessionStream.finalize(summaryV2).catch((err) => { @@ -3446,8 +3467,15 @@ export class WorkspaceRuntime { payload?: Record, opts: TriggerSignalOpts = {}, ): Promise { - const { streamId, onStreamEvent, skipStates, abortSignal, parentSessionId, webhookContext } = - opts; + const { + streamId, + onStreamEvent, + skipStates, + abortSignal, + parentSessionId, + correlationId, + webhookContext, + } = opts; // Top-level `streamId` opt wins over any payload.streamId. The runtime // reads the merged value via `signal.data.streamId` (see processSignalForJob // ~line 1595 where streamId is derived). Both surfaces stay supported so @@ -3462,6 +3490,7 @@ export class WorkspaceRuntime { data, timestamp: new Date(), parentSessionId, + correlationId, body: webhookContext?.body, headers: webhookContext?.headers, }; diff --git a/tools/agent-playground/src/lib/components/workspace/run-job-dialog.svelte b/tools/agent-playground/src/lib/components/workspace/run-job-dialog.svelte index fa813e2cb..3c6d9817c 100644 --- a/tools/agent-playground/src/lib/components/workspace/run-job-dialog.svelte +++ b/tools/agent-playground/src/lib/components/workspace/run-job-dialog.svelte @@ -14,7 +14,7 @@ @@ -243,8 +358,8 @@ {/if}
- - Run + + {submitting ? "Starting…" : "Run"} Cancel
diff --git a/tools/agent-playground/src/lib/daemon-client.ts b/tools/agent-playground/src/lib/daemon-client.ts index a06d273f5..495a12e19 100644 --- a/tools/agent-playground/src/lib/daemon-client.ts +++ b/tools/agent-playground/src/lib/daemon-client.ts @@ -17,8 +17,12 @@ import { hc } from "hono/client"; /** * Proxy base — all daemon requests route through the SvelteKit proxy * at `/api/daemon/`, which strips the prefix and forwards to the daemon. + * + * Exported so callers that need to bypass the typed Hono RPC client + * (e.g. to append a query param the route doesn't validate) can build + * a URL against the same proxy base instead of hard-coding the prefix. */ -const PROXY_BASE = "/api/daemon"; +export const PROXY_BASE = "/api/daemon"; /** * Creates a typed Hono RPC client for the local daemon, routed through the