From b208b49554387df0cfc02ad9825a23a45bd82027 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 11 May 2026 19:40:53 +0100 Subject: [PATCH 1/2] refactor(run-engine): route TTL expiration through the batch path only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TTL expiration on queued runs was being scheduled twice: once via a per-run expireRun worker job (the original implementation) and once via the batch TTL system (added more recently). The per-run job almost always won the race, leaving the batch consumer to observe runs already expired by the older path. Collapse onto the batch path. engine.trigger and delayedRunSystem.enqueueDelayedRun no longer schedule the per-run expireRun job. The TTL sorted set + batch consumer is now the only mechanism that flips queued runs to EXPIRED. Delayed runs get the same coverage by passing includeTtl: true on their post-delay enqueue, so the TTL is armed from the moment the run enters the queue (matching how the old job behaved). The new path intentionally does not re-expire runs that have already been allocated a concurrency slot — TTL is for runs that are queued and have never started. --- .server-changes/run-engine-single-ttl-path.md | 6 + .../run-engine/src/engine/index.ts | 6 +- .../src/engine/systems/delayedRunSystem.ts | 20 +- .../src/engine/tests/delays.test.ts | 45 ++++- .../src/engine/tests/lazyWaitpoint.test.ts | 21 ++- .../run-engine/src/engine/tests/ttl.test.ts | 172 +++++++++++++++++- 6 files changed, 231 insertions(+), 39 deletions(-) create mode 100644 .server-changes/run-engine-single-ttl-path.md diff --git a/.server-changes/run-engine-single-ttl-path.md b/.server-changes/run-engine-single-ttl-path.md new file mode 100644 index 00000000000..cac5e3a3cb1 --- /dev/null +++ b/.server-changes/run-engine-single-ttl-path.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Route TTL expiration through the batch TTL path only. Removes the redundant per-run `expireRun` worker job, leaving the batch consumer as the single mechanism that flips runs to `EXPIRED` when their TTL elapses while still queued. diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 1725587df45..bb977faa925 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -797,10 +797,6 @@ export class RunEngine { } } else { try { - if (taskRun.ttl) { - await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl }); - } - await this.enqueueSystem.enqueueRun({ run: taskRun, env: environment, @@ -812,7 +808,7 @@ export class RunEngine { enableFastPath, }); } catch (enqueueError) { - this.logger.error("engine.trigger(): failed to schedule TTL or enqueue run", { + this.logger.error("engine.trigger(): failed to enqueue run", { runId: taskRun.id, friendlyId: taskRun.friendlyId, taskIdentifier: taskRun.taskIdentifier, diff --git a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts index 740ce1a849f..b56fc4c7c52 100644 --- a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts @@ -2,7 +2,6 @@ import { startSpan } from "@internal/tracing"; import { SystemResources } from "./systems.js"; import { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database"; import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js"; -import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic"; import { EnqueueSystem } from "./enqueueSystem.js"; import { ServiceValidationError } from "../errors.js"; @@ -145,12 +144,17 @@ export class DelayedRunSystem { } // Now we need to enqueue the run into the RunQueue - // Skip the lock in enqueueRun since we already hold it + // Skip the lock in enqueueRun since we already hold it. + // includeTtl: true so the run's TTL is armed from the moment it enters + // the queue (not from taskRun.createdAt). The TTL system tracks runs + // that are queued and have never started — delayed runs are first + // enqueued here, so this is the correct point to arm TTL. await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, batchId: run.batchId ?? undefined, skipRunLock: true, + includeTtl: true, }); const queuedAt = new Date(); @@ -183,18 +187,6 @@ export class DelayedRunSystem { }, }); - if (run.ttl) { - const expireAt = parseNaturalLanguageDuration(run.ttl); - - if (expireAt) { - await this.$.worker.enqueue({ - id: `expireRun:${runId}`, - job: "expireRun", - payload: { runId }, - availableAt: expireAt, - }); - } - } }); } diff --git a/internal-packages/run-engine/src/engine/tests/delays.test.ts b/internal-packages/run-engine/src/engine/tests/delays.test.ts index 8a93aa1ad14..81e3641cd74 100644 --- a/internal-packages/run-engine/src/engine/tests/delays.test.ts +++ b/internal-packages/run-engine/src/engine/tests/delays.test.ts @@ -201,6 +201,11 @@ describe("RunEngine delays", () => { }, queue: { redis: redisOptions, + ttlSystem: { + pollIntervalMs: 100, + batchSize: 10, + batchMaxWaitMs: 100, + }, }, runLock: { redis: redisOptions, @@ -230,7 +235,21 @@ describe("RunEngine delays", () => { taskIdentifier ); + // TTL only expires runs still queued waiting on a concurrency slot. + // Once the delay elapses, the run gets enqueued; saturate env concurrency + // so it stays queued so the new TTL path can expire it. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 0, + }); + + const enqueuedAfterDelayTimes: number[] = []; + engine.eventBus.on("runEnqueuedAfterDelay", () => { + enqueuedAfterDelayTimes.push(Date.now()); + }); + //trigger the run + const triggerTime = Date.now(); const run = await engine.trigger( { number: 1, @@ -247,7 +266,7 @@ describe("RunEngine delays", () => { queue: "task/test-task", isTest: false, tags: [], - delayUntil: new Date(Date.now() + 1000), + delayUntil: new Date(triggerTime + 1000), ttl: "2s", }, prisma @@ -259,7 +278,7 @@ describe("RunEngine delays", () => { expect(executionData.snapshot.executionStatus).toBe("DELAYED"); expect(run.status).toBe("DELAYED"); - //wait for 1 seconds + //wait so the delay elapses and the run is enqueued await setTimeout(2_500); //should now be queued @@ -273,19 +292,29 @@ describe("RunEngine delays", () => { expect(run2.status).toBe("PENDING"); - //wait for 3 seconds - await setTimeout(3_000); + // TTL is armed at queue-enter time (not from triggerTime). With a 2s TTL + // and a 1s delay, the run becomes eligible to expire ~3s after trigger. + // Confirm the TTL was not armed against triggerTime (i.e. didn't already + // fire while still DELAYED), and that the run only expires after the + // queue-enter timestamp + ttl has elapsed. + expect(enqueuedAfterDelayTimes.length).toBe(1); + const enqueuedAt = enqueuedAfterDelayTimes[0]!; + expect(enqueuedAt - triggerTime).toBeGreaterThanOrEqual(1000); - //should now be expired - const executionData3 = await engine.getRunExecutionData({ runId: run.id }); - assertNonNullable(executionData3); - expect(executionData3.snapshot.executionStatus).toBe("FINISHED"); + //wait so the TTL fires (counted from when the run was enqueued) + await setTimeout(3_000); + // Status comes from the DB; the batch TTL path does not create + // execution snapshots, so getRunExecutionData may still show QUEUED. const run3 = await prisma.taskRun.findFirstOrThrow({ where: { id: run.id }, }); expect(run3.status).toBe("EXPIRED"); + assertNonNullable(run3.expiredAt); + // The expiry must happen after enqueue + ttl, not after trigger + ttl. + // Allow a small tolerance for poll interval + batch wait. + expect(run3.expiredAt.getTime()).toBeGreaterThanOrEqual(enqueuedAt + 2_000); } finally { await engine.quit(); } diff --git a/internal-packages/run-engine/src/engine/tests/lazyWaitpoint.test.ts b/internal-packages/run-engine/src/engine/tests/lazyWaitpoint.test.ts index bc24f9b6f1a..dedcff5b5b6 100644 --- a/internal-packages/run-engine/src/engine/tests/lazyWaitpoint.test.ts +++ b/internal-packages/run-engine/src/engine/tests/lazyWaitpoint.test.ts @@ -409,6 +409,7 @@ describe("RunEngine lazy waitpoint creation", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -434,6 +435,12 @@ describe("RunEngine lazy waitpoint creation", () => { await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + // TTL only expires runs still queued waiting on a concurrency slot. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 0, + }); + // Trigger a standalone run with TTL (no waitpoint) const run = await engine.trigger( { @@ -467,11 +474,15 @@ describe("RunEngine lazy waitpoint creation", () => { // Wait for TTL to expire await setTimeout(1_500); - // Verify run expired successfully (no throw) - const executionData = await engine.getRunExecutionData({ runId: run.id }); - assertNonNullable(executionData); - expect(executionData.run.status).toBe("EXPIRED"); - expect(executionData.snapshot.executionStatus).toBe("FINISHED"); + // Verify run expired successfully (no throw). + // The batch TTL path does not create execution snapshots, so check + // the status directly from the database rather than via + // getRunExecutionData. + const expiredRun = await prisma.taskRun.findUnique({ + where: { id: run.id }, + select: { status: true }, + }); + expect(expiredRun?.status).toBe("EXPIRED"); } finally { await engine.quit(); } diff --git a/internal-packages/run-engine/src/engine/tests/ttl.test.ts b/internal-packages/run-engine/src/engine/tests/ttl.test.ts index c1df00bf13f..cbb3de1d18f 100644 --- a/internal-packages/run-engine/src/engine/tests/ttl.test.ts +++ b/internal-packages/run-engine/src/engine/tests/ttl.test.ts @@ -1,6 +1,7 @@ import { containerTest, assertNonNullable } from "@internal/testcontainers"; import { trace } from "@internal/tracing"; import { expect } from "vitest"; +import { Decimal } from "@trigger.dev/database"; import { RunEngine } from "../index.js"; import { setTimeout } from "timers/promises"; import { EventBusEventArgs } from "../eventBus.js"; @@ -28,6 +29,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -58,6 +60,14 @@ describe("RunEngine ttl", () => { taskIdentifier ); + // TTL only expires runs still queued waiting on a concurrency slot. + // Force env concurrency to 0 so the run never gets dequeued and stays + // in the TTL set long enough for the consumer to expire it. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 0, + }); + //trigger the run const run = await engine.trigger( { @@ -153,6 +163,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -231,6 +242,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -302,6 +314,124 @@ describe("RunEngine ttl", () => { } }); + containerTest( + "Re-enqueued runs are not expired by TTL once they have started", + async ({ prisma, redisOptions }) => { + // Contract: TTL only applies to runs that are queued and have never started. + // Once a run has been dequeued (started executing), a subsequent re-enqueue + // (e.g. after a waitpoint, checkpoint resume, or pending-version flow) + // must not re-arm TTL, even if the original TTL deadline has long passed. + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const expiredEvents: EventBusEventArgs<"runExpired">[0][] = []; + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + processWorkerQueueDebounceMs: 50, + masterQueueConsumersDisabled: true, + ttlSystem: { + pollIntervalMs: 100, + batchSize: 10, + batchMaxWaitMs: 100, + }, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + engine.eventBus.on("runExpired", (result) => { + expiredEvents.push(result); + }); + + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_restart01", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_re2", + spanId: "s_re2", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + ttl: "1s", + }, + prisma + ); + + // Dequeue the run — this simulates the run starting to execute, which + // ZREMs its TTL set entry. + await engine.runQueue.processMasterQueueForEnvironment( + authenticatedEnvironment.id, + 10 + ); + const dequeued = await engine.dequeueFromWorkerQueue({ + consumerId: "test-consumer", + workerQueue: "main", + blockingPopTimeoutSeconds: 1, + }); + expect(dequeued.length).toBe(1); + + // Re-enqueue without includeTtl — this is what waitpoint/checkpoint + // resume paths do. + await engine.enqueueSystem.enqueueRun({ + run, + env: authenticatedEnvironment, + tx: prisma, + skipRunLock: true, + includeTtl: false, + }); + + // Wait well past the original 1s TTL deadline. The run was first + // enqueued ~0s ago, so this is far beyond the original deadline. + await setTimeout(2_500); + + // Run must still exist and must NOT have been expired. + expect(expiredEvents.length).toBe(0); + const reenqueuedRun = await prisma.taskRun.findUnique({ + where: { id: run.id }, + select: { status: true }, + }); + // Whatever status the dequeue/re-enqueue flow leaves the run in, it + // must NOT be EXPIRED — that's the contract this test locks in. + expect(reenqueuedRun?.status).not.toBe("EXPIRED"); + } finally { + await engine.quit(); + } + } + ); + containerTest("Multiple runs expiring via TTL batch", async ({ prisma, redisOptions }) => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); @@ -322,6 +452,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -347,6 +478,12 @@ describe("RunEngine ttl", () => { await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + // TTL only expires runs still queued waiting on a concurrency slot. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 0, + }); + engine.eventBus.on("runExpired", (result) => { expiredEvents.push(result); }); @@ -384,8 +521,10 @@ describe("RunEngine ttl", () => { expect(executionData.snapshot.executionStatus).toBe("QUEUED"); } - // Wait for TTL to expire - await setTimeout(1_500); + // Wait for TTL to expire. Concurrent triggers can land in different + // 100ms TTL-poll windows, so allow enough headroom for any stragglers + // to be claimed in a subsequent poll and flushed. + await setTimeout(2_500); // All runs should be expired expect(expiredEvents.length).toBe(3); @@ -450,6 +589,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -538,6 +678,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -563,6 +704,12 @@ describe("RunEngine ttl", () => { await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + // TTL only expires runs still queued waiting on a concurrency slot. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 0, + }); + engine.eventBus.on("runExpired", (result) => { expiredEvents.push(result); }); @@ -631,10 +778,9 @@ describe("RunEngine ttl", () => { const expiredEvents: EventBusEventArgs<"runExpired">[0][] = []; - // Disable worker to prevent the scheduleExpireRun job from firing before - // we can test the dequeue path. Use masterQueueConsumersDisabled so we can - // manually trigger dequeue via processMasterQueueForEnvironment. - // TTL consumers start independently and will expire the run after their poll interval. + // Use masterQueueConsumersDisabled so we can manually trigger dequeue via + // processMasterQueueForEnvironment. TTL consumers start independently and + // will expire the run after their poll interval. const engine = new RunEngine({ prisma, worker: { @@ -651,6 +797,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 5000, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -781,6 +928,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 5000, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -878,7 +1026,6 @@ describe("RunEngine ttl", () => { async ({ prisma, redisOptions }) => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - // Disable worker to prevent the scheduleExpireRun job from firing. // Use masterQueueConsumersDisabled so we can manually trigger dequeue. // Very long TTL consumer interval so it doesn't interfere. const engine = new RunEngine({ @@ -1246,6 +1393,7 @@ describe("RunEngine ttl", () => { ttlSystem: { pollIntervalMs: 100, batchSize: 10, + batchMaxWaitMs: 100, }, }, runLock: { @@ -1272,6 +1420,16 @@ describe("RunEngine ttl", () => { await setupBackgroundWorker(engine, authenticatedEnvironment, [parentTask, childTask]); + // TTL only expires runs still queued waiting on a concurrency slot. + // Cap env concurrency at exactly 1 (limit=1, burstFactor=1) so the + // parent takes the only slot and the child stays queued long enough + // for the new TTL path to expire it. + await engine.runQueue.updateEnvConcurrencyLimits({ + ...authenticatedEnvironment, + maximumConcurrencyLimit: 1, + concurrencyLimitBurstFactor: new Decimal(1.0), + }); + // Trigger the parent run const parentRun = await engine.trigger( { From aaf2a93591b6ee3084066743ecc89e44e31ce01e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 11 May 2026 23:14:37 +0100 Subject: [PATCH 2/2] refactor(run-engine): keep legacy expireRun job for DEV; arm TTL on pendingVersion re-enqueue Two gaps in the previous commit's collapse onto the batch TTL path: - DEV runs are fast-pathed straight to the worker queue and never enter the TTL sorted set. If the dev CLI isn't running, those runs would sit on the worker queue forever without the batch path ever seeing them. In non-DEV environments runs always execute once they're on the worker queue, so this isn't a concern there. Restore the legacy per-run expireRun job, gated on environment.type === "DEVELOPMENT". - pendingVersionSystem.enqueueRunsForBackgroundWorker re-enqueues runs that were waiting for a matching worker version. That re-enqueue is the first time the run is actually queued for a worker, so pass includeTtl: true to arm TTL on the new message. Adds a DEV TTL test that exercises the fast-path coverage and a pendingVersion test that asserts ttlExpiresAt is present after re-enqueue. --- .../run-engine/src/engine/index.ts | 10 ++ .../src/engine/systems/delayedRunSystem.ts | 17 +++ .../engine/systems/pendingVersionSystem.ts | 5 + .../src/engine/tests/pendingVersion.test.ts | 116 ++++++++++++++++++ .../run-engine/src/engine/tests/ttl.test.ts | 97 +++++++++++++++ 5 files changed, 245 insertions(+) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index bb977faa925..e0af0f2c4ff 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -797,6 +797,16 @@ export class RunEngine { } } else { try { + // The new batch TTL path only expires runs still in the queue + // sorted set (waiting on a concurrency slot). For DEV + // environments where the dev CLI may not be running, fast-pathed + // runs can sit on the worker queue indefinitely and never get + // claimed for expiration. Keep the legacy per-run expireRun job + // armed for DEV so those runs still expire. + if (taskRun.ttl && environment.type === "DEVELOPMENT") { + await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl }); + } + await this.enqueueSystem.enqueueRun({ run: taskRun, env: environment, diff --git a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts index b56fc4c7c52..32ab98bad6c 100644 --- a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts @@ -2,6 +2,7 @@ import { startSpan } from "@internal/tracing"; import { SystemResources } from "./systems.js"; import { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database"; import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js"; +import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic"; import { EnqueueSystem } from "./enqueueSystem.js"; import { ServiceValidationError } from "../errors.js"; @@ -143,6 +144,22 @@ export class DelayedRunSystem { return; } + // The batch TTL path only expires runs still in the queue sorted set. + // For DEV environments where the dev CLI may not be running, fast-pathed + // runs can sit on the worker queue indefinitely. Keep the legacy per-run + // expireRun job armed for DEV so those runs still expire. + if (run.ttl && run.runtimeEnvironment.type === "DEVELOPMENT") { + const expireAt = parseNaturalLanguageDuration(run.ttl); + if (expireAt) { + await this.$.worker.enqueue({ + id: `expireRun:${runId}`, + job: "expireRun", + payload: { runId }, + availableAt: expireAt, + }); + } + } + // Now we need to enqueue the run into the RunQueue // Skip the lock in enqueueRun since we already hold it. // includeTtl: true so the run's TTL is armed from the moment it enters diff --git a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts index c22429ac0c9..9007cf86b2d 100644 --- a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts @@ -98,6 +98,11 @@ export class PendingVersionSystem { run: updatedRun, env: backgroundWorker.runtimeEnvironment, tx, + // PENDING_VERSION re-enqueue is the first time this run is actually + // entering the run queue (the original enqueue was held back waiting + // for a worker version). Arm TTL here so the TTL system can expire it + // if it sits queued waiting on a concurrency slot. + includeTtl: true, }); }); diff --git a/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts b/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts index 65498e32ffe..38eaa00b213 100644 --- a/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts +++ b/internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts @@ -309,4 +309,120 @@ describe("RunEngine pending version", () => { } } ); + + containerTest( + "PENDING_VERSION re-enqueue arms TTL on the queued message", + async ({ prisma, redisOptions }) => { + // When a run enters PENDING_VERSION (background worker doesn't yet have + // the task), the first enqueue happens but the message is dequeued and + // its TTL set entry is dropped while the run waits for a matching worker. + // Once a worker arrives, pendingVersionSystem re-enqueues. That + // re-enqueue is the first time the run is actually queued for a worker, + // so TTL must be armed at that point — not held over from the original + // enqueue. + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + processWorkerQueueDebounceMs: 50, + masterQueueConsumersDisabled: true, + ttlSystem: { + pollIntervalMs: 100, + batchSize: 10, + batchMaxWaitMs: 100, + }, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + // Trigger a run with TTL — no background worker exists yet for this + // task, so it will end up in PENDING_VERSION. + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_pvttl1", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "tpv1", + spanId: "spv1", + queue: "task/test-task", + isTest: false, + tags: [], + ttl: "10m", + }, + prisma + ); + + // A worker arrives that doesn't have this task — flushes the run to + // PENDING_VERSION. + await setupBackgroundWorker(engine, authenticatedEnvironment, ["test-task-other"]); + + await setTimeout(500); + + // The consumer attempt is what flushes the run to PENDING_VERSION — + // dequeue finds no matching task version and returns nothing. + const dequeuedEmpty = await engine.dequeueFromWorkerQueue({ + consumerId: "test_pv", + workerQueue: "main", + }); + expect(dequeuedEmpty.length).toBe(0); + + const executionDataAfter = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionDataAfter); + expect(executionDataAfter.run.status).toBe("PENDING_VERSION"); + + // Now a worker arrives WITH the task — pendingVersionSystem + // re-enqueues the run. + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + await setTimeout(1000); + + const executionDataReenqueued = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionDataReenqueued); + expect(executionDataReenqueued.run.status).toBe("PENDING"); + + // The re-enqueued message must carry ttlExpiresAt so the TTL set + // tracks it for expiration. + const message = await engine.runQueue.readMessage( + authenticatedEnvironment.organization.id, + run.id + ); + assertNonNullable(message); + expect(message.ttlExpiresAt).toBeDefined(); + expect(typeof message.ttlExpiresAt).toBe("number"); + } finally { + await engine.quit(); + } + } + ); }); diff --git a/internal-packages/run-engine/src/engine/tests/ttl.test.ts b/internal-packages/run-engine/src/engine/tests/ttl.test.ts index cbb3de1d18f..e787d916f8a 100644 --- a/internal-packages/run-engine/src/engine/tests/ttl.test.ts +++ b/internal-packages/run-engine/src/engine/tests/ttl.test.ts @@ -432,6 +432,103 @@ describe("RunEngine ttl", () => { } ); + containerTest( + "DEV runs sitting on worker queue still expire via legacy per-run job", + async ({ prisma, redisOptions }) => { + // The batch TTL path only expires runs still in the queue sorted set. + // In DEV, runs are fast-pathed straight to the worker queue, and if the + // dev CLI isn't running they can sit there forever. The legacy per-run + // expireRun job is kept for DEV specifically to cover this case. + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "DEVELOPMENT"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + processWorkerQueueDebounceMs: 50, + masterQueueConsumersDisabled: true, + // TTL batch path is enabled but should never see this run: it goes + // straight to the worker queue via fast-path. The legacy per-run + // job is what should expire it. + ttlSystem: { + pollIntervalMs: 100, + batchSize: 10, + batchMaxWaitMs: 100, + }, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + let expiredEventData: EventBusEventArgs<"runExpired">[0] | undefined; + engine.eventBus.on("runExpired", (result) => { + expiredEventData = result; + }); + + // Trigger a DEV run with fast-path enabled and a short TTL. The run + // should land in the worker queue without entering the TTL set. + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_devttl1", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "tdevttl1", + spanId: "sdevttl1", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + ttl: "1s", + enableFastPath: true, + }, + prisma + ); + + // Wait past the TTL. The legacy per-run job should fire and expire it. + await setTimeout(1_500); + + assertNonNullable(expiredEventData); + const expiredRun = await prisma.taskRun.findUnique({ + where: { id: run.id }, + select: { status: true }, + }); + expect(expiredRun?.status).toBe("EXPIRED"); + } finally { + await engine.quit(); + } + } + ); + containerTest("Multiple runs expiring via TTL batch", async ({ prisma, redisOptions }) => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");