From de4612cc8b389a776c7ce86d8e2c852feda9b079 Mon Sep 17 00:00:00 2001 From: Yoan Bozhilov Date: Wed, 10 Jun 2026 14:37:01 +0300 Subject: [PATCH 1/3] fix(taskprocessing): claim tasks atomically with SKIP LOCKED + composite index Replace the worker retry/ignore-list claim-loop with a single atomic SELECT ... FOR UPDATE SKIP LOCKED claim (SQLite bounded-retry fallback), preserving the no-duplicate guarantee while removing the thundering-herd contention that throttled backlog draining. Add a (status,type,last_updated) index via the table-creating migration + db:add-missing-indices listener. Signed-off-by: Yoan Bozhilov Assisted-by: Claude Code:claude-opus-4-8 --- core/Command/TaskProcessing/WorkerCommand.php | 28 ++-- core/Listener/AddMissingIndicesListener.php | 6 + .../Version30000Date20240429122720.php | 1 + lib/private/TaskProcessing/Db/TaskMapper.php | 155 ++++++++++++++++++ lib/private/TaskProcessing/Manager.php | 15 ++ lib/public/TaskProcessing/IManager.php | 16 ++ .../TaskProcessing/WorkerCommandTest.php | 28 ++-- .../TaskProcessing/SkipLockedSqlShapeTest.php | 90 ++++++++++ .../lib/TaskProcessing/TaskProcessingTest.php | 137 ++++++++++++++++ 9 files changed, 451 insertions(+), 25 deletions(-) create mode 100644 tests/lib/TaskProcessing/SkipLockedSqlShapeTest.php diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php index 1f6d473f4e849..98bca8f84d1e8 100644 --- a/core/Command/TaskProcessing/WorkerCommand.php +++ b/core/Command/TaskProcessing/WorkerCommand.php @@ -14,7 +14,6 @@ use OCP\AppFramework\Utility\ITimeFactory; use OCP\IAppConfig; use OCP\TaskProcessing\Exception\Exception; -use OCP\TaskProcessing\Exception\NotFoundException; use OCP\TaskProcessing\IManager; use OCP\TaskProcessing\ISynchronousProvider; use Psr\Log\LoggerInterface; @@ -118,9 +117,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int * Attempt to process one task across all preferred synchronous providers. * * To avoid starvation, all eligible task types are first collected and then - * the oldest scheduled task across all of them is fetched in a single query. - * This ensures that tasks are processed in the order they were scheduled, - * regardless of which provider handles them. + * the oldest scheduled task across all of them is claimed in a single atomic + * query (FOR UPDATE SKIP LOCKED, with a SQLite fallback). This ensures tasks + * are processed in the order they were scheduled, regardless of which provider + * handles them, and guarantees no two workers ever claim the same task. * * @param list $taskTypes When non-empty, only providers for these task type IDs are considered. * @return bool True if a task was processed, false if no task was found @@ -162,15 +162,21 @@ private function processNextTask(OutputInterface $output, array $taskTypes = []) return false; } - // Fetch the oldest scheduled task across all eligible task types in one query. - // This naturally prevents starvation: regardless of how many tasks one provider - // has queued, another provider's older tasks will be picked up first. + // Atomically claim the oldest scheduled task across all eligible task types in + // one query. SELECT ... FOR UPDATE SKIP LOCKED (with a SQLite fallback) both + // fetches and marks the task RUNNING, so multiple workers never claim the same + // task and no per-worker ignore-list / retry loop is needed. This also naturally + // prevents starvation: regardless of how many tasks one provider has queued, + // another provider's older tasks are picked up first. try { - $task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders)); - } catch (NotFoundException) { - return false; + $task = $this->taskProcessingManager->claimNextScheduledTask(array_keys($eligibleProviders)); } catch (Exception $e) { - $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]); + $this->logger->error('Unknown error while claiming scheduled TaskProcessing tasks', ['exception' => $e]); + return false; + } + + if ($task === null) { + // No schedulable task available right now. return false; } diff --git a/core/Listener/AddMissingIndicesListener.php b/core/Listener/AddMissingIndicesListener.php index 07d0c42678c66..e35126ab16ded 100644 --- a/core/Listener/AddMissingIndicesListener.php +++ b/core/Listener/AddMissingIndicesListener.php @@ -223,5 +223,11 @@ public function handle(Event $event): void { ['user', 'mountpoint'], ['lengths' => [null, 128]] ); + + $event->addMissingIndex( + 'taskprocessing_tasks', + 'taskp_status_type_upd', + ['status', 'type', 'last_updated'] + ); } } diff --git a/core/Migrations/Version30000Date20240429122720.php b/core/Migrations/Version30000Date20240429122720.php index 7f256b929cbfb..1b665e1e9e71c 100644 --- a/core/Migrations/Version30000Date20240429122720.php +++ b/core/Migrations/Version30000Date20240429122720.php @@ -98,6 +98,7 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt $table->addIndex(['status', 'type'], 'taskp_tasks_status_type'); $table->addIndex(['last_updated'], 'taskp_tasks_updated'); $table->addIndex(['user_id', 'app_id', 'custom_id'], 'taskp_tasks_uid_appid_cid'); + $table->addIndex(['status', 'type', 'last_updated'], 'taskp_status_type_upd'); return $schema; } diff --git a/lib/private/TaskProcessing/Db/TaskMapper.php b/lib/private/TaskProcessing/Db/TaskMapper.php index 4e5e1c4695af7..954dda61d4f29 100644 --- a/lib/private/TaskProcessing/Db/TaskMapper.php +++ b/lib/private/TaskProcessing/Db/TaskMapper.php @@ -15,6 +15,7 @@ use OCP\AppFramework\Db\QBMapper; use OCP\AppFramework\Utility\ITimeFactory; use OCP\DB\Exception; +use OCP\DB\QueryBuilder\ConflictResolutionMode; use OCP\DB\QueryBuilder\IQueryBuilder; use OCP\IDBConnection; @@ -75,6 +76,160 @@ public function findOldestScheduledByType(array $taskTypes, array $taskIdsToIgno return $this->findEntity($qb); } + /** + * Atomically claim the oldest scheduled task of the given task types and mark it RUNNING. + * + * This is the structural fix for the worker "claim loop": instead of every worker + * racing for the single oldest task (a thundering herd that grows a per-worker + * `id NOT IN (...)` ignore list and slows the SELECT), each worker claims a + * *distinct* task in one round trip. + * + * On databases that support row-level locking with SKIP LOCKED + * (MySQL/MariaDB/PostgreSQL) the claim is a single transaction: + * SELECT ... WHERE status = SCHEDULED [AND type IN (...)] + * ORDER BY last_updated ASC LIMIT 1 FOR UPDATE SKIP LOCKED + * followed by a guarded UPDATE to RUNNING. Concurrent workers skip rows already + * locked by another transaction, so no two workers ever claim the same task. + * + * SQLite does not support SKIP LOCKED (verified: Doctrine throws "Operation + * 'SKIP LOCKED' is not supported by platform"), so we feature-detect via the DB + * provider and fall back to the existing bounded {@see lockTask} retry, which is + * still safe because the UPDATE ... WHERE status = SCHEDULED is itself atomic and + * SQLite serialises writers. + * + * A task is only ever transitioned SCHEDULED -> RUNNING here; it is never marked + * FAILED by claiming. If the task cannot be claimed (none scheduled, or it was + * taken by another worker between SELECT and UPDATE) this returns null. + * + * @param list $taskTypes When non-empty, only tasks of these task type IDs are considered. + * @return Task|null The claimed task (status RUNNING), or null if nothing could be claimed. + * @throws Exception + */ + public function claimOldestScheduledTask(array $taskTypes): ?Task { + if ($this->db->getDatabaseProvider() === IDBConnection::PLATFORM_SQLITE) { + // SKIP LOCKED is unsupported on SQLite: fall back to the bounded lock-and-retry claim. + return $this->claimWithBoundedRetry($taskTypes); + } + + return $this->claimWithSkipLocked($taskTypes); + } + + /** + * Atomic claim using FOR UPDATE SKIP LOCKED in a single transaction. + * + * @param list $taskTypes + * @return Task|null + * @throws Exception + */ + private function claimWithSkipLocked(array $taskTypes): ?Task { + $this->db->beginTransaction(); + try { + $qb = $this->db->getQueryBuilder(); + $qb->select(Task::COLUMNS) + ->from($this->tableName) + ->where($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT))) + ->orderBy('last_updated', 'ASC') + ->setMaxResults(1) + ->forUpdate(ConflictResolutionMode::SkipLocked); + + if (!empty($taskTypes)) { + $filter = []; + foreach ($taskTypes as $taskType) { + $filter[] = $qb->expr()->eq('type', $qb->createPositionalParameter($taskType)); + } + $qb->andWhere($qb->expr()->orX(...$filter)); + } + + $result = $qb->executeQuery(); + $row = $result->fetch(); + $result->closeCursor(); + + if ($row === false) { + // Nothing schedulable (or every candidate is locked by another worker). + $this->db->commit(); + return null; + } + + /** @var Task $task */ + $task = $this->mapRowToEntity($row); + + // Record the start time at claim time: because the worker receives the task + // already in status RUNNING, the later SCHEDULED -> RUNNING transition in + // Manager::setTaskStatus is skipped and would otherwise never persist started_at. + $startedAt = $this->timeFactory->now()->getTimestamp(); + + // Guarded transition SCHEDULED -> RUNNING. The row is locked for this + // transaction, so the guard is belt-and-braces rather than strictly required. + $update = $this->db->getQueryBuilder(); + $update->update($this->tableName) + ->set('status', $update->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_RUNNING, IQueryBuilder::PARAM_INT)) + ->set('started_at', $update->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT)) + ->where($update->expr()->eq('id', $update->createPositionalParameter($task->getId(), IQueryBuilder::PARAM_INT))) + ->andWhere($update->expr()->eq('status', $update->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT))); + $affected = $update->executeStatement(); + + $this->db->commit(); + + if ($affected === 0) { + // Lost the race (should not happen under SKIP LOCKED); leave the task SCHEDULED. + return null; + } + + $task->setStatus(\OCP\TaskProcessing\Task::STATUS_RUNNING); + $task->setStartedAt($startedAt); + return $task; + } catch (\Throwable $e) { + $this->db->rollBack(); + throw $e; + } + } + + /** + * Fallback claim for databases without SKIP LOCKED (SQLite). + * + * Repeatedly fetches the oldest scheduled task and attempts the atomic + * UPDATE ... WHERE status = SCHEDULED. Tasks lost to another worker are added to a + * short ignore list so the next iteration moves on. Bounded to avoid unbounded + * looping under contention. + * + * @param list $taskTypes + * @return Task|null + * @throws Exception + */ + private function claimWithBoundedRetry(array $taskTypes): ?Task { + $taskIdsToIgnore = []; + // A handful of attempts is plenty: on SQLite writers are serialised, so at most + // a few rows can be claimed out from under us before we either win or run dry. + for ($attempt = 0; $attempt < 10; $attempt++) { + try { + $task = $this->findOldestScheduledByType($taskTypes, $taskIdsToIgnore); + } catch (DoesNotExistException) { + return null; + } + + if ($this->lockTask($task) !== 0) { + $task->setStatus(\OCP\TaskProcessing\Task::STATUS_RUNNING); + // Record the start time at claim time. lockTask only flips the status (and is + // shared with other callers), so persist started_at with a targeted follow-up + // UPDATE rather than changing lockTask's behaviour. The worker receives the task + // already RUNNING, so Manager::setTaskStatus would otherwise never write it. + $startedAt = $this->timeFactory->now()->getTimestamp(); + $update = $this->db->getQueryBuilder(); + $update->update($this->tableName) + ->set('started_at', $update->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT)) + ->where($update->expr()->eq('id', $update->createPositionalParameter($task->getId(), IQueryBuilder::PARAM_INT))); + $update->executeStatement(); + $task->setStartedAt($startedAt); + return $task; + } + + // Another worker took it; skip this id and try the next oldest. + $taskIdsToIgnore[] = $task->getId(); + } + + return null; + } + /** * @param int $id * @param string|null $userId diff --git a/lib/private/TaskProcessing/Manager.php b/lib/private/TaskProcessing/Manager.php index c3e1b78d2be69..4c43fae58f7b4 100644 --- a/lib/private/TaskProcessing/Manager.php +++ b/lib/private/TaskProcessing/Manager.php @@ -1437,6 +1437,21 @@ public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToI } } + #[\Override] + public function claimNextScheduledTask(array $taskTypeIds = []): ?Task { + try { + $taskEntity = $this->taskMapper->claimOldestScheduledTask($taskTypeIds); + if ($taskEntity === null) { + return null; + } + return $taskEntity->toPublicTask(); + } catch (\OCP\DB\Exception $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem claiming the task', previous: $e); + } catch (\JsonException $e) { + throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after claiming the task', previous: $e); + } + } + /** * Takes task input data and replaces fileIds with File objects * diff --git a/lib/public/TaskProcessing/IManager.php b/lib/public/TaskProcessing/IManager.php index 841d2ac5fed9e..2270bcfd20172 100644 --- a/lib/public/TaskProcessing/IManager.php +++ b/lib/public/TaskProcessing/IManager.php @@ -186,6 +186,22 @@ public function getNextScheduledTask(array $taskTypeIds = [], array $taskIdsToIg */ public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToIgnore = [], int $numberOfTasks = 1): array; + /** + * Atomically claim the oldest scheduled task of the given task types and mark it RUNNING. + * + * Unlike {@see getNextScheduledTask} (which only fetches) this both selects and + * locks the task in one step, so concurrent workers never claim the same task. + * On databases supporting it this uses SELECT ... FOR UPDATE SKIP LOCKED; on + * SQLite it falls back to a bounded lock-and-retry. The task is only ever + * transitioned SCHEDULED -> RUNNING; it is never marked FAILED by claiming. + * + * @param list $taskTypeIds When non-empty, only tasks of these task type IDs are considered. + * @return Task|null The claimed task (status RUNNING), or null if nothing could be claimed. + * @throws Exception If the query failed + * @since 34.0.0 + */ + public function claimNextScheduledTask(array $taskTypeIds = []): ?Task; + /** * @param int $id The id of the task * @param string|null $userId The user id that scheduled the task diff --git a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php index ddeb28dfdd08d..b71f06b65397a 100644 --- a/tests/Core/Command/TaskProcessing/WorkerCommandTest.php +++ b/tests/Core/Command/TaskProcessing/WorkerCommandTest.php @@ -13,7 +13,6 @@ use OCP\AppFramework\Utility\ITimeFactory; use OCP\IAppConfig; use OCP\TaskProcessing\Exception\Exception; -use OCP\TaskProcessing\Exception\NotFoundException; use OCP\TaskProcessing\IManager; use OCP\TaskProcessing\ISynchronousProvider; use OCP\TaskProcessing\Task; @@ -89,7 +88,7 @@ public function testOnceProcessesOneTask(): void { ->willReturn($provider); $this->manager->expects($this->once()) - ->method('getNextScheduledTask') + ->method('claimNextScheduledTask') ->with([$taskTypeId]) ->willReturn($task); @@ -120,7 +119,7 @@ public function testSkipsNonSynchronousProviders(): void { ->method('getPreferredProvider'); $this->manager->expects($this->never()) - ->method('getNextScheduledTask'); + ->method('claimNextScheduledTask'); $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); $output = new NullOutput(); @@ -144,9 +143,9 @@ public function testSkipsNonPreferredProviders(): void { ->with($taskTypeId) ->willReturn($preferredProvider); - // provider_a is not preferred (provider_b is), so getNextScheduledTask is never called + // provider_a is not preferred (provider_b is), so claimNextScheduledTask is never called $this->manager->expects($this->never()) - ->method('getNextScheduledTask'); + ->method('claimNextScheduledTask'); $input = new ArrayInput(['--once' => true], $this->command->getDefinition()); $output = new NullOutput(); @@ -169,10 +168,11 @@ public function testContinuesWhenNoTaskFound(): void { ->with($taskTypeId) ->willReturn($provider); + // The no-task path is now claimNextScheduledTask returning null (not an exception). $this->manager->expects($this->once()) - ->method('getNextScheduledTask') + ->method('claimNextScheduledTask') ->with([$taskTypeId]) - ->willThrowException(new NotFoundException()); + ->willReturn(null); $this->manager->expects($this->never()) ->method('processTask'); @@ -200,13 +200,13 @@ public function testLogsErrorAndContinuesOnException(): void { $exception = new Exception('DB error'); $this->manager->expects($this->once()) - ->method('getNextScheduledTask') + ->method('claimNextScheduledTask') ->with([$taskTypeId]) ->willThrowException($exception); $this->logger->expects($this->once()) ->method('error') - ->with('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $exception]); + ->with('Unknown error while claiming scheduled TaskProcessing tasks', ['exception' => $exception]); $this->manager->expects($this->never()) ->method('processTask'); @@ -240,9 +240,9 @@ public function testProcessesCorrectProviderForReturnedTaskType(): void { [$taskTypeId2, $provider2], ]); - // All eligible task types are passed in a single query + // All eligible task types are passed in a single atomic claim $this->manager->expects($this->once()) - ->method('getNextScheduledTask') + ->method('claimNextScheduledTask') ->with($this->equalTo([$taskTypeId1, $taskTypeId2])) ->willReturn($task); @@ -278,7 +278,7 @@ public function testTaskTypesWhitelistFiltersProviders(): void { ->willReturn($provider2); $this->manager->expects($this->once()) - ->method('getNextScheduledTask') + ->method('claimNextScheduledTask') ->with([$taskTypeId2]) ->willReturn($task); @@ -307,7 +307,7 @@ public function testTaskTypesWhitelistWithNoMatchingProviders(): void { ->method('getPreferredProvider'); $this->manager->expects($this->never()) - ->method('getNextScheduledTask'); + ->method('claimNextScheduledTask'); $input = new ArrayInput(['--once' => true, '--taskTypes' => ['type_b']], $this->command->getDefinition()); $output = new NullOutput(); @@ -332,7 +332,7 @@ public function testEmptyTaskTypesAllowsAllProviders(): void { ->willReturn($provider); $this->manager->expects($this->once()) - ->method('getNextScheduledTask') + ->method('claimNextScheduledTask') ->with([$taskTypeId]) ->willReturn($task); diff --git a/tests/lib/TaskProcessing/SkipLockedSqlShapeTest.php b/tests/lib/TaskProcessing/SkipLockedSqlShapeTest.php new file mode 100644 index 0000000000000..444734043e63e --- /dev/null +++ b/tests/lib/TaskProcessing/SkipLockedSqlShapeTest.php @@ -0,0 +1,90 @@ +systemConfig = $this->createMock(SystemConfig::class); + $this->logger = $this->createMock(LoggerInterface::class); + } + + /** + * Build a QueryBuilder backed by a non-SQLite (MySQL 8) platform so the + * generated SQL exposes the locking clause the way it would in production. + */ + private function newMysqlQueryBuilder(): QueryBuilder { + $inner = $this->createMock(Connection::class); + $inner->method('getDatabasePlatform')->willReturn(new MySQL80Platform()); + + $adapter = $this->createMock(ConnectionAdapter::class); + $adapter->method('getInner')->willReturn($inner); + $adapter->method('getDatabaseProvider')->willReturn(IDBConnection::PLATFORM_MYSQL); + + return new QueryBuilder($adapter, $this->systemConfig, $this->logger); + } + + public function testClaimQueryContainsForUpdateSkipLocked(): void { + $qb = $this->newMysqlQueryBuilder(); + $qb->select('id', 'status', 'type', 'last_updated') + ->from('taskprocessing_tasks') + ->where($qb->expr()->eq('status', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT))) + ->orderBy('last_updated', 'ASC') + ->setMaxResults(1) + ->forUpdate(ConflictResolutionMode::SkipLocked); + + $sql = $qb->getSQL(); + + self::assertStringContainsString('FOR UPDATE', $sql); + self::assertStringContainsString('SKIP LOCKED', $sql); + } + + public function testOrdinaryForUpdateHasNoSkipLocked(): void { + // Sanity check: only the SkipLocked mode adds the SKIP LOCKED clause. + $qb = $this->newMysqlQueryBuilder(); + $qb->select('id') + ->from('taskprocessing_tasks') + ->setMaxResults(1) + ->forUpdate(ConflictResolutionMode::Ordinary); + + $sql = $qb->getSQL(); + + self::assertStringContainsString('FOR UPDATE', $sql); + self::assertStringNotContainsString('SKIP LOCKED', $sql); + } +} diff --git a/tests/lib/TaskProcessing/TaskProcessingTest.php b/tests/lib/TaskProcessing/TaskProcessingTest.php index 59a0ba1d23728..d6caf4d243576 100644 --- a/tests/lib/TaskProcessing/TaskProcessingTest.php +++ b/tests/lib/TaskProcessing/TaskProcessingTest.php @@ -1620,4 +1620,141 @@ private function configureEventDispatcherMock( } }); } + + /** + * Register a single synchronous provider for TextToText so tasks can be scheduled. + * + * The integration suite shares one database across tests and does not truncate + * between them, so we clear the tasks table first to make the oldest-scheduled + * ordering of the claim deterministic for these focused tests. + */ + private function registerTextToTextProvider(): void { + $db = Server::get(IDBConnection::class); + $db->getQueryBuilder()->delete('taskprocessing_tasks')->executeStatement(); + + $this->appConfig->setValueString('core', 'ai.taskprocessing_type_preferences', '', lazy: true); + $this->registrationContext->expects($this->any())->method('getTaskProcessingProviders')->willReturn([ + new ServiceRegistration('test', SuccessfulSyncProvider::class) + ]); + self::assertTrue($this->manager->hasProviders()); + } + + public function testClaimReturnsNullWhenNoScheduledTask(): void { + $this->registerTextToTextProvider(); + + // No task scheduled => nothing to claim. + self::assertNull($this->manager->claimNextScheduledTask([TextToText::ID])); + self::assertNull($this->manager->claimNextScheduledTask()); + } + + public function testClaimReturnsTaskAndSetsItRunning(): void { + $this->registerTextToTextProvider(); + + $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); + $this->manager->scheduleTask($task); + self::assertEquals(Task::STATUS_SCHEDULED, $task->getStatus()); + $scheduledId = $task->getId(); + + $claimed = $this->manager->claimNextScheduledTask([TextToText::ID]); + self::assertNotNull($claimed); + self::assertEquals($scheduledId, $claimed->getId()); + // The returned task object reports RUNNING ... + self::assertEquals(Task::STATUS_RUNNING, $claimed->getStatus()); + // ... and the change is persisted in the database. + $persisted = $this->manager->getTask($scheduledId); + self::assertEquals(Task::STATUS_RUNNING, $persisted->getStatus()); + } + + public function testClaimNeverReturnsTheSameTaskTwice(): void { + // No-duplicate invariant. We cannot run two truly concurrent DB transactions + // inside one PHPUnit process, but the structural guarantee is the same: once a + // task is claimed it is RUNNING (no longer SCHEDULED), so a second claim can + // never return it again. Under real concurrency, FOR UPDATE SKIP LOCKED enforces + // the same property by skipping rows another transaction has locked; that path + // is additionally validated live on nc-ai. + $this->registerTextToTextProvider(); + + $taskA = new Task(TextToText::ID, ['input' => 'A'], 'test', null); + $this->manager->scheduleTask($taskA); + $taskB = new Task(TextToText::ID, ['input' => 'B'], 'test', null); + $this->manager->scheduleTask($taskB); + + $firstClaim = $this->manager->claimNextScheduledTask([TextToText::ID]); + $secondClaim = $this->manager->claimNextScheduledTask([TextToText::ID]); + + self::assertNotNull($firstClaim); + self::assertNotNull($secondClaim); + // Two distinct tasks were handed out, never the same one twice. + self::assertNotEquals($firstClaim->getId(), $secondClaim->getId()); + self::assertEqualsCanonicalizing( + [$taskA->getId(), $taskB->getId()], + [$firstClaim->getId(), $secondClaim->getId()], + ); + + // Both are now RUNNING and the queue is drained. + self::assertEquals(Task::STATUS_RUNNING, $this->manager->getTask($taskA->getId())->getStatus()); + self::assertEquals(Task::STATUS_RUNNING, $this->manager->getTask($taskB->getId())->getStatus()); + self::assertNull($this->manager->claimNextScheduledTask([TextToText::ID])); + } + + public function testClaimNeverMarksTaskFailed(): void { + $this->registerTextToTextProvider(); + + $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); + $this->manager->scheduleTask($task); + $id = $task->getId(); + + $claimed = $this->manager->claimNextScheduledTask([TextToText::ID]); + self::assertNotNull($claimed); + + // Claiming only ever transitions SCHEDULED -> RUNNING, never to FAILED/CANCELLED. + self::assertNotEquals(Task::STATUS_FAILED, $claimed->getStatus()); + self::assertNotEquals(Task::STATUS_CANCELLED, $claimed->getStatus()); + $persisted = $this->manager->getTask($id); + self::assertNotEquals(Task::STATUS_FAILED, $persisted->getStatus()); + self::assertNotEquals(Task::STATUS_CANCELLED, $persisted->getStatus()); + self::assertEquals(Task::STATUS_RUNNING, $persisted->getStatus()); + self::assertNull($persisted->getErrorMessage()); + } + + public function testClaimRespectsTaskTypeFilter(): void { + $this->registerTextToTextProvider(); + + $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); + $this->manager->scheduleTask($task); + + // A type that does not match the only scheduled task must not be claimed. + self::assertNull($this->manager->claimNextScheduledTask(['some:other:tasktype'])); + // The task is still SCHEDULED and claimable without a filter. + self::assertEquals(Task::STATUS_SCHEDULED, $this->manager->getTask($task->getId())->getStatus()); + $claimed = $this->manager->claimNextScheduledTask(); + self::assertNotNull($claimed); + self::assertEquals($task->getId(), $claimed->getId()); + } + + public function testClaimRecordsStartedAt(): void { + $this->registerTextToTextProvider(); + + $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); + $this->manager->scheduleTask($task); + // A scheduled task has not started yet. + self::assertNull($this->manager->getTask($task->getId())->getStartedAt()); + + $before = time(); + $claimed = $this->manager->claimNextScheduledTask([TextToText::ID]); + $after = time(); + + self::assertNotNull($claimed); + // started_at is recorded at claim time on the returned task ... + self::assertNotNull($claimed->getStartedAt()); + self::assertGreaterThanOrEqual($before, $claimed->getStartedAt()); + self::assertLessThanOrEqual($after, $claimed->getStartedAt()); + // ... and persisted in the database (since the worker receives the task already + // RUNNING, the later setTaskStatus SCHEDULED -> RUNNING edge is skipped and would + // otherwise never write started_at). + $persisted = $this->manager->getTask($task->getId()); + self::assertNotNull($persisted->getStartedAt()); + self::assertGreaterThanOrEqual($before, $persisted->getStartedAt()); + self::assertLessThanOrEqual($after, $persisted->getStartedAt()); + } } From 57bb450702e4b37752877eab8302a5cfc5e0ba4a Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Thu, 11 Jun 2026 20:10:21 +0200 Subject: [PATCH 2/3] fix: Address review comments Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Marcel Klehr --- lib/private/TaskProcessing/Db/TaskMapper.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/private/TaskProcessing/Db/TaskMapper.php b/lib/private/TaskProcessing/Db/TaskMapper.php index 954dda61d4f29..3b0c5f546999e 100644 --- a/lib/private/TaskProcessing/Db/TaskMapper.php +++ b/lib/private/TaskProcessing/Db/TaskMapper.php @@ -82,8 +82,7 @@ public function findOldestScheduledByType(array $taskTypes, array $taskIdsToIgno * This is the structural fix for the worker "claim loop": instead of every worker * racing for the single oldest task (a thundering herd that grows a per-worker * `id NOT IN (...)` ignore list and slows the SELECT), each worker claims a - * *distinct* task in one round trip. - * + * *distinct* task in a single claim attempt without a per-worker ignore-list. * On databases that support row-level locking with SKIP LOCKED * (MySQL/MariaDB/PostgreSQL) the claim is a single transaction: * SELECT ... WHERE status = SCHEDULED [AND type IN (...)] From f1f1659f420563abcaec724c1693b2bf235ec74e Mon Sep 17 00:00:00 2001 From: Yoan Bozhilov Date: Fri, 12 Jun 2026 09:27:11 +0300 Subject: [PATCH 3/3] fix(taskprocessing): guard lockTask on scheduled, record started_at, Oracle fallback Address review feedback (@marcelklehr, Copilot): - lockTask claims only SCHEDULED tasks (was status != RUNNING) and stamps started_at in the same atomic UPDATE, so a finished task cannot be re-claimed and the external-provider claim path records started_at as well. - claimWithBoundedRetry re-reads after lockTask instead of a follow-up UPDATE. - Oracle joins SQLite on the bounded-retry fallback: Oracle cannot combine a row-limiting clause with FOR UPDATE (ORA-02014), which failed the claim tests on Oracle CI. - Reword the worker docblock/comments to "prefer oldest available" (parallel SKIP LOCKED does not guarantee a strict global order). - Add a regression test that lockTask does not resurrect a finished task. Signed-off-by: Yoan Bozhilov Assisted-by: Claude Code:claude-opus-4-8 --- core/Command/TaskProcessing/WorkerCommand.php | 14 ++--- lib/private/TaskProcessing/Db/TaskMapper.php | 53 +++++++++++-------- .../lib/TaskProcessing/TaskProcessingTest.php | 25 +++++++++ 3 files changed, 64 insertions(+), 28 deletions(-) diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php index 98bca8f84d1e8..0ca4c43fba0f2 100644 --- a/core/Command/TaskProcessing/WorkerCommand.php +++ b/core/Command/TaskProcessing/WorkerCommand.php @@ -118,9 +118,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int * * To avoid starvation, all eligible task types are first collected and then * the oldest scheduled task across all of them is claimed in a single atomic - * query (FOR UPDATE SKIP LOCKED, with a SQLite fallback). This ensures tasks - * are processed in the order they were scheduled, regardless of which provider - * handles them, and guarantees no two workers ever claim the same task. + * query (FOR UPDATE SKIP LOCKED, with a SQLite fallback). Each claim prefers the + * oldest available scheduled task -- under parallel workers SKIP LOCKED skips rows + * another worker has locked, so this reduces starvation rather than guaranteeing a + * strict global processing order -- and no two workers ever claim the same task. * * @param list $taskTypes When non-empty, only providers for these task type IDs are considered. * @return bool True if a task was processed, false if no task was found @@ -165,9 +166,10 @@ private function processNextTask(OutputInterface $output, array $taskTypes = []) // Atomically claim the oldest scheduled task across all eligible task types in // one query. SELECT ... FOR UPDATE SKIP LOCKED (with a SQLite fallback) both // fetches and marks the task RUNNING, so multiple workers never claim the same - // task and no per-worker ignore-list / retry loop is needed. This also naturally - // prevents starvation: regardless of how many tasks one provider has queued, - // another provider's older tasks are picked up first. + // task and no per-worker ignore-list / retry loop is needed. This also reduces + // starvation: each claim prefers the oldest available task, so a provider with a + // large queue does not indefinitely block another provider's older tasks (though a + // worker may claim a newer task while an older one is locked by another worker). try { $task = $this->taskProcessingManager->claimNextScheduledTask(array_keys($eligibleProviders)); } catch (Exception $e) { diff --git a/lib/private/TaskProcessing/Db/TaskMapper.php b/lib/private/TaskProcessing/Db/TaskMapper.php index 3b0c5f546999e..b8898702fb6d3 100644 --- a/lib/private/TaskProcessing/Db/TaskMapper.php +++ b/lib/private/TaskProcessing/Db/TaskMapper.php @@ -90,11 +90,15 @@ public function findOldestScheduledByType(array $taskTypes, array $taskIdsToIgno * followed by a guarded UPDATE to RUNNING. Concurrent workers skip rows already * locked by another transaction, so no two workers ever claim the same task. * - * SQLite does not support SKIP LOCKED (verified: Doctrine throws "Operation - * 'SKIP LOCKED' is not supported by platform"), so we feature-detect via the DB - * provider and fall back to the existing bounded {@see lockTask} retry, which is - * still safe because the UPDATE ... WHERE status = SCHEDULED is itself atomic and - * SQLite serialises writers. + * Two databases cannot use the SKIP LOCKED path and fall back to a bounded + * lock-and-retry claim instead: + * - SQLite has no SKIP LOCKED (Doctrine throws "Operation 'SKIP LOCKED' is not + * supported by platform"). + * - Oracle cannot combine a row-limiting clause with FOR UPDATE: the LIMIT is + * emulated with a ROWNUM sub-select, and selecting FOR UPDATE from that derived + * view raises ORA-02014. + * The fallback is still safe because the UPDATE ... WHERE status = SCHEDULED is itself + * atomic (SQLite additionally serialises writers). * * A task is only ever transitioned SCHEDULED -> RUNNING here; it is never marked * FAILED by claiming. If the task cannot be claimed (none scheduled, or it was @@ -105,8 +109,10 @@ public function findOldestScheduledByType(array $taskTypes, array $taskIdsToIgno * @throws Exception */ public function claimOldestScheduledTask(array $taskTypes): ?Task { - if ($this->db->getDatabaseProvider() === IDBConnection::PLATFORM_SQLITE) { - // SKIP LOCKED is unsupported on SQLite: fall back to the bounded lock-and-retry claim. + $provider = $this->db->getDatabaseProvider(); + // SKIP LOCKED is unusable on SQLite (unsupported) and Oracle (LIMIT + FOR UPDATE => + // ORA-02014): both fall back to the bounded lock-and-retry claim. + if ($provider === IDBConnection::PLATFORM_SQLITE || $provider === IDBConnection::PLATFORM_ORACLE) { return $this->claimWithBoundedRetry($taskTypes); } @@ -184,7 +190,7 @@ private function claimWithSkipLocked(array $taskTypes): ?Task { } /** - * Fallback claim for databases without SKIP LOCKED (SQLite). + * Fallback claim for databases that cannot use the SKIP LOCKED path (SQLite, Oracle). * * Repeatedly fetches the oldest scheduled task and attempts the atomic * UPDATE ... WHERE status = SCHEDULED. Tasks lost to another worker are added to a @@ -207,19 +213,9 @@ private function claimWithBoundedRetry(array $taskTypes): ?Task { } if ($this->lockTask($task) !== 0) { - $task->setStatus(\OCP\TaskProcessing\Task::STATUS_RUNNING); - // Record the start time at claim time. lockTask only flips the status (and is - // shared with other callers), so persist started_at with a targeted follow-up - // UPDATE rather than changing lockTask's behaviour. The worker receives the task - // already RUNNING, so Manager::setTaskStatus would otherwise never write it. - $startedAt = $this->timeFactory->now()->getTimestamp(); - $update = $this->db->getQueryBuilder(); - $update->update($this->tableName) - ->set('started_at', $update->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT)) - ->where($update->expr()->eq('id', $update->createPositionalParameter($task->getId(), IQueryBuilder::PARAM_INT))); - $update->executeStatement(); - $task->setStartedAt($startedAt); - return $task; + // lockTask atomically flipped SCHEDULED -> RUNNING and stamped started_at. + // Re-read so the returned task reflects the persisted status and started_at. + return $this->find($task->getId()); } // Another worker took it; skip this id and try the next oldest. @@ -376,12 +372,25 @@ public function update(Entity $entity): Entity { return parent::update($entity); } + /** + * Atomically claim a task by transitioning it SCHEDULED -> RUNNING. + * + * The UPDATE is guarded on `status = SCHEDULED` so a task another worker has already + * finished (SUCCESSFUL/FAILED) between a caller's SELECT and this UPDATE can never be + * re-claimed and processed twice. started_at is stamped in the same statement: the + * worker receives the task already RUNNING, so the later SCHEDULED -> RUNNING edge in + * Manager::setTaskStatus (which used to set started_at) no longer fires. + * + * @return int Number of rows updated: 1 if the task was claimed, 0 if it was no longer scheduled. + */ public function lockTask(Entity $entity): int { + $startedAt = $this->timeFactory->now()->getTimestamp(); $qb = $this->db->getQueryBuilder(); $qb->update($this->tableName) ->set('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_RUNNING, IQueryBuilder::PARAM_INT)) + ->set('started_at', $qb->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT)) ->where($qb->expr()->eq('id', $qb->createPositionalParameter($entity->getId(), IQueryBuilder::PARAM_INT))) - ->andWhere($qb->expr()->neq('status', $qb->createPositionalParameter(2, IQueryBuilder::PARAM_INT))); + ->andWhere($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT))); try { return $qb->executeStatement(); } catch (Exception) { diff --git a/tests/lib/TaskProcessing/TaskProcessingTest.php b/tests/lib/TaskProcessing/TaskProcessingTest.php index d6caf4d243576..8dc361dd20fb4 100644 --- a/tests/lib/TaskProcessing/TaskProcessingTest.php +++ b/tests/lib/TaskProcessing/TaskProcessingTest.php @@ -1757,4 +1757,29 @@ public function testClaimRecordsStartedAt(): void { self::assertGreaterThanOrEqual($before, $persisted->getStartedAt()); self::assertLessThanOrEqual($after, $persisted->getStartedAt()); } + + public function testLockTaskDoesNotResurrectFinishedTask(): void { + // Regression guard for the lockTask claim path (used by the SQLite fallback and the + // external-provider API claim). lockTask must only ever transition SCHEDULED -> RUNNING. + // If another worker finished a task (SUCCESSFUL/FAILED) between the SELECT and this + // UPDATE, lockTask must NOT flip it back to RUNNING -- otherwise a completed task is + // resurrected and processed twice. (The previous `status != RUNNING` guard let a + // SUCCESSFUL/FAILED row be re-locked.) + $this->registerTextToTextProvider(); + + $task = new Task(TextToText::ID, ['input' => 'Hello'], 'test', null); + $this->manager->scheduleTask($task); + $id = $task->getId(); + + // Simulate another worker having already finished the task. + $entity = $this->taskMapper->find($id); + $entity->setStatus(Task::STATUS_SUCCESSFUL); + $this->taskMapper->update($entity); + + // Attempting to claim the (now SUCCESSFUL) task must be a no-op. + $affected = $this->taskMapper->lockTask($this->taskMapper->find($id)); + + self::assertSame(0, $affected, 'lockTask must not claim a task that is no longer SCHEDULED'); + self::assertEquals(Task::STATUS_SUCCESSFUL, $this->manager->getTask($id)->getStatus()); + } }