Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions core/Command/TaskProcessing/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\IAppConfig;
use OCP\TaskProcessing\Exception\Exception;
use OCP\TaskProcessing\Exception\NotFoundException;
use OCP\TaskProcessing\IManager;
use OCP\TaskProcessing\ISynchronousProvider;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -118,9 +117,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int
* Attempt to process one task across all preferred synchronous providers.
*
* To avoid starvation, all eligible task types are first collected and then
* the oldest scheduled task across all of them is fetched in a single query.
* This ensures that tasks are processed in the order they were scheduled,
* regardless of which provider handles them.
* the oldest scheduled task across all of them is claimed in a single atomic
* query (FOR UPDATE SKIP LOCKED, with a SQLite fallback). Each claim prefers the
* oldest available scheduled task -- under parallel workers SKIP LOCKED skips rows
* another worker has locked, so this reduces starvation rather than guaranteeing a
* strict global processing order -- and no two workers ever claim the same task.
*
* @param list<string> $taskTypes When non-empty, only providers for these task type IDs are considered.
* @return bool True if a task was processed, false if no task was found
Expand Down Expand Up @@ -162,15 +163,22 @@ private function processNextTask(OutputInterface $output, array $taskTypes = [])
return false;
}

// Fetch the oldest scheduled task across all eligible task types in one query.
// This naturally prevents starvation: regardless of how many tasks one provider
// has queued, another provider's older tasks will be picked up first.
// Atomically claim the oldest scheduled task across all eligible task types in
// one query. SELECT ... FOR UPDATE SKIP LOCKED (with a SQLite fallback) both
// fetches and marks the task RUNNING, so multiple workers never claim the same
// task and no per-worker ignore-list / retry loop is needed. This also reduces
// starvation: each claim prefers the oldest available task, so a provider with a
// large queue does not indefinitely block another provider's older tasks (though a
// worker may claim a newer task while an older one is locked by another worker).
try {
$task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders));
} catch (NotFoundException) {
return false;
$task = $this->taskProcessingManager->claimNextScheduledTask(array_keys($eligibleProviders));
} catch (Exception $e) {
$this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]);
$this->logger->error('Unknown error while claiming scheduled TaskProcessing tasks', ['exception' => $e]);
return false;
}

if ($task === null) {
// No schedulable task available right now.
return false;
}

Expand Down
6 changes: 6 additions & 0 deletions core/Listener/AddMissingIndicesListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -223,5 +223,11 @@ public function handle(Event $event): void {
['user', 'mountpoint'],
['lengths' => [null, 128]]
);

$event->addMissingIndex(
'taskprocessing_tasks',
'taskp_status_type_upd',
['status', 'type', 'last_updated']
);
}
}
1 change: 1 addition & 0 deletions core/Migrations/Version30000Date20240429122720.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public function changeSchema(IOutput $output, Closure $schemaClosure, array $opt
$table->addIndex(['status', 'type'], 'taskp_tasks_status_type');
$table->addIndex(['last_updated'], 'taskp_tasks_updated');
$table->addIndex(['user_id', 'app_id', 'custom_id'], 'taskp_tasks_uid_appid_cid');
$table->addIndex(['status', 'type', 'last_updated'], 'taskp_status_type_upd');

return $schema;
}
Expand Down
165 changes: 164 additions & 1 deletion lib/private/TaskProcessing/Db/TaskMapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use OCP\AppFramework\Db\QBMapper;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\DB\Exception;
use OCP\DB\QueryBuilder\ConflictResolutionMode;
use OCP\DB\QueryBuilder\IQueryBuilder;
use OCP\IDBConnection;

Expand Down Expand Up @@ -75,6 +76,155 @@ public function findOldestScheduledByType(array $taskTypes, array $taskIdsToIgno
return $this->findEntity($qb);
}

/**
* Atomically claim the oldest scheduled task of the given task types and mark it RUNNING.
*
* This is the structural fix for the worker "claim loop": instead of every worker
* racing for the single oldest task (a thundering herd that grows a per-worker
* `id NOT IN (...)` ignore list and slows the SELECT), each worker claims a
* *distinct* task in a single claim attempt without a per-worker ignore-list.
* On databases that support row-level locking with SKIP LOCKED
* (MySQL/MariaDB/PostgreSQL) the claim is a single transaction:
* SELECT ... WHERE status = SCHEDULED [AND type IN (...)]
* ORDER BY last_updated ASC LIMIT 1 FOR UPDATE SKIP LOCKED
* followed by a guarded UPDATE to RUNNING. Concurrent workers skip rows already
* locked by another transaction, so no two workers ever claim the same task.
*
* Two databases cannot use the SKIP LOCKED path and fall back to a bounded
* lock-and-retry claim instead:
* - SQLite has no SKIP LOCKED (Doctrine throws "Operation 'SKIP LOCKED' is not
* supported by platform").
* - Oracle cannot combine a row-limiting clause with FOR UPDATE: the LIMIT is
* emulated with a ROWNUM sub-select, and selecting FOR UPDATE from that derived
* view raises ORA-02014.
* The fallback is still safe because the UPDATE ... WHERE status = SCHEDULED is itself
* atomic (SQLite additionally serialises writers).
*
* A task is only ever transitioned SCHEDULED -> RUNNING here; it is never marked
* FAILED by claiming. If the task cannot be claimed (none scheduled, or it was
* taken by another worker between SELECT and UPDATE) this returns null.
*
* @param list<string> $taskTypes When non-empty, only tasks of these task type IDs are considered.
* @return Task|null The claimed task (status RUNNING), or null if nothing could be claimed.
* @throws Exception
*/
public function claimOldestScheduledTask(array $taskTypes): ?Task {
$provider = $this->db->getDatabaseProvider();
// SKIP LOCKED is unusable on SQLite (unsupported) and Oracle (LIMIT + FOR UPDATE =>
// ORA-02014): both fall back to the bounded lock-and-retry claim.
if ($provider === IDBConnection::PLATFORM_SQLITE || $provider === IDBConnection::PLATFORM_ORACLE) {
return $this->claimWithBoundedRetry($taskTypes);
}

return $this->claimWithSkipLocked($taskTypes);
}

/**
* Atomic claim using FOR UPDATE SKIP LOCKED in a single transaction.
*
* @param list<string> $taskTypes
* @return Task|null
* @throws Exception
*/
private function claimWithSkipLocked(array $taskTypes): ?Task {
$this->db->beginTransaction();
try {
$qb = $this->db->getQueryBuilder();
$qb->select(Task::COLUMNS)
->from($this->tableName)
->where($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)))
->orderBy('last_updated', 'ASC')
->setMaxResults(1)
->forUpdate(ConflictResolutionMode::SkipLocked);

if (!empty($taskTypes)) {
$filter = [];
foreach ($taskTypes as $taskType) {
$filter[] = $qb->expr()->eq('type', $qb->createPositionalParameter($taskType));
}
$qb->andWhere($qb->expr()->orX(...$filter));
}

$result = $qb->executeQuery();
$row = $result->fetch();
$result->closeCursor();

if ($row === false) {
// Nothing schedulable (or every candidate is locked by another worker).
$this->db->commit();
return null;
}

/** @var Task $task */
$task = $this->mapRowToEntity($row);

// Record the start time at claim time: because the worker receives the task
// already in status RUNNING, the later SCHEDULED -> RUNNING transition in
// Manager::setTaskStatus is skipped and would otherwise never persist started_at.
$startedAt = $this->timeFactory->now()->getTimestamp();

// Guarded transition SCHEDULED -> RUNNING. The row is locked for this
// transaction, so the guard is belt-and-braces rather than strictly required.
$update = $this->db->getQueryBuilder();
$update->update($this->tableName)
->set('status', $update->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_RUNNING, IQueryBuilder::PARAM_INT))
->set('started_at', $update->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT))
->where($update->expr()->eq('id', $update->createPositionalParameter($task->getId(), IQueryBuilder::PARAM_INT)))
->andWhere($update->expr()->eq('status', $update->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)));
$affected = $update->executeStatement();

$this->db->commit();

if ($affected === 0) {
// Lost the race (should not happen under SKIP LOCKED); leave the task SCHEDULED.
return null;
}

$task->setStatus(\OCP\TaskProcessing\Task::STATUS_RUNNING);
$task->setStartedAt($startedAt);
return $task;
} catch (\Throwable $e) {
$this->db->rollBack();
throw $e;
}
}

/**
* Fallback claim for databases that cannot use the SKIP LOCKED path (SQLite, Oracle).
*
* Repeatedly fetches the oldest scheduled task and attempts the atomic
* UPDATE ... WHERE status = SCHEDULED. Tasks lost to another worker are added to a
* short ignore list so the next iteration moves on. Bounded to avoid unbounded
* looping under contention.
*
* @param list<string> $taskTypes
* @return Task|null
* @throws Exception
*/
private function claimWithBoundedRetry(array $taskTypes): ?Task {
$taskIdsToIgnore = [];
// A handful of attempts is plenty: on SQLite writers are serialised, so at most
// a few rows can be claimed out from under us before we either win or run dry.
for ($attempt = 0; $attempt < 10; $attempt++) {
try {
$task = $this->findOldestScheduledByType($taskTypes, $taskIdsToIgnore);
} catch (DoesNotExistException) {
return null;
}

if ($this->lockTask($task) !== 0) {
// lockTask atomically flipped SCHEDULED -> RUNNING and stamped started_at.
// Re-read so the returned task reflects the persisted status and started_at.
return $this->find($task->getId());
}

// Another worker took it; skip this id and try the next oldest.
$taskIdsToIgnore[] = $task->getId();
}

return null;
}

/**
* @param int $id
* @param string|null $userId
Expand Down Expand Up @@ -222,12 +372,25 @@ public function update(Entity $entity): Entity {
return parent::update($entity);
}

/**
* Atomically claim a task by transitioning it SCHEDULED -> RUNNING.
*
* The UPDATE is guarded on `status = SCHEDULED` so a task another worker has already
* finished (SUCCESSFUL/FAILED) between a caller's SELECT and this UPDATE can never be
* re-claimed and processed twice. started_at is stamped in the same statement: the
* worker receives the task already RUNNING, so the later SCHEDULED -> RUNNING edge in
* Manager::setTaskStatus (which used to set started_at) no longer fires.
*
* @return int Number of rows updated: 1 if the task was claimed, 0 if it was no longer scheduled.
*/
public function lockTask(Entity $entity): int {
$startedAt = $this->timeFactory->now()->getTimestamp();
$qb = $this->db->getQueryBuilder();
$qb->update($this->tableName)
->set('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_RUNNING, IQueryBuilder::PARAM_INT))
->set('started_at', $qb->createPositionalParameter($startedAt, IQueryBuilder::PARAM_INT))
->where($qb->expr()->eq('id', $qb->createPositionalParameter($entity->getId(), IQueryBuilder::PARAM_INT)))
->andWhere($qb->expr()->neq('status', $qb->createPositionalParameter(2, IQueryBuilder::PARAM_INT)));
->andWhere($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT)));
try {
return $qb->executeStatement();
} catch (Exception) {
Expand Down
15 changes: 15 additions & 0 deletions lib/private/TaskProcessing/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,21 @@ public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToI
}
}

#[\Override]
public function claimNextScheduledTask(array $taskTypeIds = []): ?Task {
try {
$taskEntity = $this->taskMapper->claimOldestScheduledTask($taskTypeIds);
if ($taskEntity === null) {
return null;
}
return $taskEntity->toPublicTask();
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem claiming the task', previous: $e);
} catch (\JsonException $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem parsing JSON after claiming the task', previous: $e);
}
}

/**
* Takes task input data and replaces fileIds with File objects
*
Expand Down
16 changes: 16 additions & 0 deletions lib/public/TaskProcessing/IManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,22 @@ public function getNextScheduledTask(array $taskTypeIds = [], array $taskIdsToIg
*/
public function getNextScheduledTasks(array $taskTypeIds = [], array $taskIdsToIgnore = [], int $numberOfTasks = 1): array;

/**
* Atomically claim the oldest scheduled task of the given task types and mark it RUNNING.
*
* Unlike {@see getNextScheduledTask} (which only fetches) this both selects and
* locks the task in one step, so concurrent workers never claim the same task.
* On databases supporting it this uses SELECT ... FOR UPDATE SKIP LOCKED; on
* SQLite it falls back to a bounded lock-and-retry. The task is only ever
* transitioned SCHEDULED -> RUNNING; it is never marked FAILED by claiming.
*
* @param list<string> $taskTypeIds When non-empty, only tasks of these task type IDs are considered.
* @return Task|null The claimed task (status RUNNING), or null if nothing could be claimed.
* @throws Exception If the query failed
* @since 34.0.0
*/
public function claimNextScheduledTask(array $taskTypeIds = []): ?Task;

/**
* @param int $id The id of the task
* @param string|null $userId The user id that scheduled the task
Expand Down
Loading
Loading