Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions apps/atlasd/routes/sessions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@ import { daemonFactory } from "../../src/factory.ts";
import { publishSessionCancel } from "../../src/session-dispatch-registry.ts";
import { getAccessibleWorkspaceIds, requireWorkspaceMember } from "../../src/workspace-authz.ts";

const ListQuery = z.object({ workspaceId: z.string().optional() });
const ListQuery = z.object({
workspaceId: z.string().optional(),
/**
* Filter to sessions whose `SessionSummary.correlationId` matches.
* Lets a `?nowait=true` signal-trigger caller redeem the 202's
* correlationId for the spawned sessionId without polling for a
* generic "newest non-snapshot" guess.
*/
correlationId: z.string().optional(),
});

/**
* Build a SessionSummary from a SessionView (for active sessions
Expand All @@ -30,6 +39,7 @@ function viewToSummary(view: SessionView): SessionSummary {
stepCount: view.agentBlocks.length,
agentNames: view.agentBlocks.map((b) => b.agentName),
aiSummary: view.aiSummary,
...(view.correlationId && { correlationId: view.correlationId }),
};
}

Expand All @@ -45,7 +55,7 @@ const sessionsRoutes = daemonFactory
const userId = c.get("userId");
if (!userId) return c.json({ error: "Unauthorized" }, 401);

const { workspaceId } = c.req.valid("query");
const { workspaceId, correlationId } = c.req.valid("query");
if (workspaceId !== undefined) {
await requireWorkspaceMember(c, workspaceId);
}
Expand All @@ -70,10 +80,20 @@ const sessionsRoutes = daemonFactory
}

// Merge and sort by startedAt descending
const sessions = [...activeSummaries, ...completedSummaries].sort(
let sessions = [...activeSummaries, ...completedSummaries].sort(
(a, b) => new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime(),
);

// correlationId filter β€” applied after the workspace filter so a
// nowait caller can look up its spawned session without knowing
// the workspaceId (though typically it does). Returns at most one
// session in practice (correlationId is per-signal-trigger), but
// we don't enforce that here β€” callers should treat results as
// "all sessions tagged with this correlationId".
if (correlationId !== undefined) {
sessions = sessions.filter((s) => s.correlationId === correlationId);
}

return c.json({ sessions }, 200);
})
/**
Expand Down
6 changes: 6 additions & 0 deletions apps/atlasd/src/atlas-daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,12 @@ export class AtlasDaemon {
// consumed β€” Phase 11 makes it carry across into
// `SessionSummary.parentSessionId`.
parentSessionId: envelope.sourceSessionId,
// Carry the HTTP trigger's correlationId through to the
// spawned session so a `?nowait=true` caller can redeem the
// 202's correlationId for this sessionId via
// `GET /api/sessions?correlationId=…`. Absent on
// non-correlated envelopes (cron, fs-watch).
correlationId: envelope.correlationId,
webhookContext: envelopeToWebhookContext(envelope),
},
);
Expand Down
27 changes: 27 additions & 0 deletions packages/core/src/session/session-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ export const SessionStartEventSchema = z.object({
* inflight-marker writer to let listers filter without a join.
*/
signalId: z.string().optional(),
/**
* Correlation id from the HTTP envelope that triggered this session.
* Carried so the SessionView reducer can surface it on active
* sessions (the durable `SessionSummary.correlationId` only exists
* after finalization). Optional β€” absent for non-HTTP triggers or
* historical events that predate the field.
*/
correlationId: z.string().optional(),
plannedSteps: z
.array(
z.object({
Expand Down Expand Up @@ -255,6 +263,14 @@ export const SessionViewSchema = z.object({
error: z.string().optional(),
/** AI-generated summary produced at session finalization */
aiSummary: SessionAISummarySchema.optional(),
/**
* Correlation id from the HTTP envelope that triggered this session.
* Captured from the `session:start` event by the reducer. Lets the
* list endpoint surface correlationId for *active* sessions too β€”
* the durable `SessionSummary.correlationId` only exists post-
* finalization.
*/
correlationId: z.string().optional(),
});
export type SessionView = z.infer<typeof SessionViewSchema>;

Expand Down Expand Up @@ -290,5 +306,16 @@ export const SessionSummarySchema = z.object({
* pure session-level linkage works without it.
*/
parentEventId: z.string().optional(),
/**
* Correlation id assigned by the signal trigger that spawned this
* session. Carried on the cascade envelope, threaded through to the
* session at creation. Lets a `?nowait=true` HTTP caller redeem the
* correlationId from the 202 response for the spawned sessionId via
* `GET /api/sessions?correlationId=…`, without polling for a generic
* "newest non-snapshot" guess. Absent for sessions spawned without a
* correlated envelope (chat-tools that call `bypassConcurrency`,
* cron signals, fs-watch signals).
*/
correlationId: z.string().optional(),
});
export type SessionSummary = z.infer<typeof SessionSummarySchema>;
1 change: 1 addition & 0 deletions packages/core/src/session/session-reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export function reduceSessionEvent(
task: event.task,
status: "active",
startedAt: event.timestamp,
...(event.correlationId && { correlationId: event.correlationId }),
agentBlocks:
event.plannedSteps?.map((step) => ({
stepNumber: undefined,
Expand Down
33 changes: 31 additions & 2 deletions packages/workspace/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ interface WorkspaceRuntimeSignal {
* provenance for crystallization. Absent for root sessions.
*/
parentSessionId?: string;
/**
* Correlation id assigned by the HTTP signal trigger that produced
* this envelope. Forwarded to `SessionSummary.correlationId` at
* finalization so a `?nowait=true` caller can redeem its 202's
* correlationId for the spawned sessionId via
* `GET /api/sessions?correlationId=…`. Absent for non-correlated
* triggers (cron, fs-watch, bypassConcurrency chat tools).
*/
correlationId?: string;
/**
* Base64-encoded original webhook request body. Only set for signals
* fired through Friday's webhook-tunnel (the byte-for-byte HTTP proxy).
Expand Down Expand Up @@ -277,6 +286,13 @@ export interface TriggerSignalOpts {
* session records its parent. Phase 11 provenance.
*/
parentSessionId?: string;
/**
* Correlation id from the HTTP envelope that triggered this signal.
* Threads through to `SessionSummary.correlationId` so the spawned
* session is discoverable from the 202's `{ correlationId }` via
* `GET /api/sessions?correlationId=…`. Absent for non-HTTP triggers.
*/
correlationId?: string;
/**
* Webhook-only context (base64 body + lowercased headers) preserved
* byte-for-byte from the upstream HTTP request when the signal was
Expand Down Expand Up @@ -2049,6 +2065,7 @@ export class WorkspaceRuntime {
task: typeof signal.data?.task === "string" ? signal.data.task : job.name,
plannedSteps,
timestamp: session.startedAt.toISOString(),
...(signal.correlationId && { correlationId: signal.correlationId }),
});

// Emit session-start to the client's SSE stream so the UI can display
Expand Down Expand Up @@ -2429,6 +2446,10 @@ export class WorkspaceRuntime {
// field on the wire β€” keeps existing SESSION_METADATA entries
// and round-trip schema parses unchanged.
...(signal.parentSessionId && { parentSessionId: signal.parentSessionId }),
// Correlation id from the HTTP envelope. Lets nowait callers
// redeem their 202's correlationId for this sessionId via
// GET /api/sessions?correlationId=…
...(signal.correlationId && { correlationId: signal.correlationId }),
};

await sessionStream.finalize(summaryV2).catch((err) => {
Expand Down Expand Up @@ -3446,8 +3467,15 @@ export class WorkspaceRuntime {
payload?: Record<string, unknown>,
opts: TriggerSignalOpts = {},
): Promise<WorkspaceSignalRunResult> {
const { streamId, onStreamEvent, skipStates, abortSignal, parentSessionId, webhookContext } =
opts;
const {
streamId,
onStreamEvent,
skipStates,
abortSignal,
parentSessionId,
correlationId,
webhookContext,
} = opts;
// Top-level `streamId` opt wins over any payload.streamId. The runtime
// reads the merged value via `signal.data.streamId` (see processSignalForJob
// ~line 1595 where streamId is derived). Both surfaces stay supported so
Expand All @@ -3462,6 +3490,7 @@ export class WorkspaceRuntime {
data,
timestamp: new Date(),
parentSessionId,
correlationId,
body: webhookContext?.body,
headers: webhookContext?.headers,
};
Expand Down
Loading