Skip to content

FR: Acknowledging jobs outside of worker function #685

@rauanmayemir

Description

@rauanmayemir

Currently, the job is 'done' as long as worker.Work() returns.
I need a different approach similar to what AMQP supports where pulling a job is decoupled from acknowledging or not acknowledging it.

Ideally, some sort of AsyncWorker would help a lot, especially if it can be handled with channels. I.e something like:

consumeChan, err = riverClient.Consume(ctx, &river.ConsumeOpts{
  // I'm not sure this makes sense here in case of pg notify
  MaxPrefechCount: 10,
  // Simimar to `RescueStuckJobsAfter`, nack the stuck jobs after a timeout
  NackTimeout: time.Second * time.Duration(60)
})

go func() {
  for asyncJob := range consumeChan {
    processJob(asyncJob)
  }
}()


func processJob(asyncJob *river.AsyncJob[AsyncJobArgs])  {
    // autonomous `worker.Work` behaviour
    // process the job ()
    // ...

    if err != nil {
      _ = asyncJob.JobSnooze(err)
      // too many retries
      // _ = asyncJob.JobCancel(err)
    }

    err = asyncJob.JobComplete()
    if err != nil {
      log.Debug("failed to complete the job, channel is probably closed")
    }

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions