Skip to content

Queue.process

Grant Carthew edited this page Aug 23, 2016 · 43 revisions

Method Signature

Queue.process(Handler)

Parameter: Handler Function

  • A job processing Function supporting two parameters, job and next.

  • The Handler signature is function (job, next) { }

Returns: Promise => true

  • This Promise is from Queue.ready() ensuring the Queue is in a ready state.

Example:

q.process((job, next) => {
  // Process the job here, call next when done
}).catch((err) => {
  console.error(err)
})

Description

This is the business end of rethinkdb-job-queue where your jobs get processed. Any type of job can be used with the Queue. From sending e-mails to processing files to training machine learning models.

The function you pass into Queue.process() will carry out your job processing. The details required for your job can be obtained from the Job object that is passed into your process handler. If you populated your jobs with a Job.recipient property, then you use the job.recipient property in your handling function.

Whilst processing your job you can call the Job.setProgress method on the Job object to update the progress in the database. Be careful about how many times you call Job.setProgress because each call will touch the database. Another major feature of Job.setProgress is that it will reset the job timeout process and the dateRetry property for the job in the database. This prevents a long running job from timing out and being classified as 'failed' by the Queue Master database review process.

On completion of your processing call the next(err, data) function to let the Queue know you have completed the job or it has failed.

next(jobResult)

Returns: Promise => Number

  • The number of running jobs for the Queue object.

When your job has finished being processed you inform the Queue that the job has completed or failed by calling the next() function within your process handler. There is no need to catch errors returned by the next() Promise. They will be raised as error events.

Just for information purposes the next() Promise resolves to the total number of currently running jobs in the Queue object.

Completed Job

If the job processing completed successfully, simply call next() and pass in the job result as the only argument for logging purposes. The result will be added as a log entry for the job.

next()
// or
next('Radiation levels are nominal')

Failed Job

If the job processing has failed, create a new error object and pass it in as the only argument. The error.message property will be logged against the job.

const err = new Error('Watch out for them gamma rays')
next(err)

This will fail the job in the database and begin the Job Retry process.

Failed Job with Cancel

If the job is unrecoverable and you wish to cancel it preventing any retry attempt, populate the Error object with a cancelJob property containing a cancel reason message. If you don't want to supply a reason message, just set cancelJob to true.

const err = new Error('Watch out for them gamma rays')
err.cancelJob = 'Turned into the big green guy'
next(err)

The err.cancelJob property is detected by the Queue and will change the jobs status from 'failed' to 'terminated' preventing retries.

Examples

E-Mail Job Processing

This example shows the full functionality of the rethinkdb-job-queue for processing an email job. It is just an example that was copied from the mailgun-js README with Queue functionality added. It may not work even with your api_key.

const api_key = 'key-XXXXXXXXXXXXXXXXXXXXXXX'
const domain = 'mydomain.mailgun.org'
const mailgun = require('mailgun-js')({apiKey: api_key, domain: domain});
const Queue = require('rethinkdb-job-queue')
const q = new Queue()
const job = q.createJob()

// Here we attach the jobs payload. In this case mailgun data.
job.data = {
  from: 'Super Heros <[email protected]>',
  to: '[email protected]',
  subject: 'Registration for superheros.com',
  text: 'Click this link to activate your account on superheros.com'
}

q.addJob(job).catch((err) => {
  console.error(err)
})

// Queue.process can be placed before or after adding jobs
q.process((job, next) => {
  mailgun.messages().send(job.data, function (err, body) {
    if (err && err.message === 'something really bad') {
    err.cancelJob = 'something really bad happened'
    }
    if (err) { next(err) }
    console.log(body)
    next('email sent successfully')
  })
}).catch((err) => {
  console.error(err)
})

Main

How It Works

Contributing

API

Queue Methods

Queue Properties

Queue Events

Job Methods

Job Properties

Documentation

Clone this wiki locally