-
Notifications
You must be signed in to change notification settings - Fork 16
Queue.process
Parameter: Handler Function
-
A job processing Function supporting two parameters,
jobandnext. -
The Handler signature is
function (job, next) { }
Returns: Promise => true
- This
Promiseis fromQueue.ready()ensuring the Queue is in a ready state.
Example:
q.process((job, next) => {
// Process the job here, call next when done
}).catch((err) => {
console.error(err)
})This is the business end of rethinkdb-job-queue where your jobs get processed. Any type of job can be used with the Queue. From sending e-mails to processing files to training machine learning models.
The function you pass into Queue.process() will carry out your job processing. The details required for your job can be obtained from the Job object that is passed into your process handler. If you populated your jobs with a Job.recipient property, then you use the job.recipient property in your handling function.
Whilst processing your job you can call the Job.setProgress method on the Job object to update the progress in the database. Be careful about how many times you call Job.setProgress because each call will touch the database. Another major feature of Job.setProgress is that it will reset the job timeout process and the dateRetry property for the job in the database. This prevents a long running job from timing out and being classified as 'failed' by the Queue Master database review process.
On completion of your processing call the next(err, data) function to let the Queue know you have completed the job or it has failed.
Returns: Promise => Number
- The number of running jobs for the Queue object.
When your job has finished being processed you inform the Queue that the job has completed or failed by calling the next() function within your process handler. There is no need to catch errors returned by the next() Promise. They will be raised as error events.
Just for information purposes the next() Promise resolves to the total number of currently running jobs in the Queue object.
If the job processing completed successfully, simply call next() and pass in the job result as the only argument for logging purposes. The result will be added as a log entry for the job.
next()
// or
next('Radiation levels are nominal')If the job processing has failed, create a new error object and pass it in as the only argument. The error.message property will be logged against the job.
const err = new Error('Watch out for them gamma rays')
next(err)This will fail the job in the database and begin the Job Retry process.
If the job is unrecoverable and you wish to cancel it preventing any retry attempt, populate the Error object with a cancelJob property containing a cancel reason message. If you don't want to supply a reason message, just set cancelJob to true.
const err = new Error('Watch out for them gamma rays')
err.cancelJob = 'Turned into the big green guy'
next(err)The err.cancelJob property is detected by the Queue and will change the jobs status from 'failed' to 'terminated' preventing retries.
This example shows the full functionality of the rethinkdb-job-queue for processing an email job. It is just an example that was copied from the mailgun-js README with Queue functionality added. It may not work even with your api_key.
const api_key = 'key-XXXXXXXXXXXXXXXXXXXXXXX'
const domain = 'mydomain.mailgun.org'
const mailgun = require('mailgun-js')({apiKey: api_key, domain: domain});
const Queue = require('rethinkdb-job-queue')
const q = new Queue()
const job = q.createJob()
// Here we attach the jobs payload. In this case mailgun data.
job.data = {
from: 'Super Heros <[email protected]>',
to: '[email protected]',
subject: 'Registration for superheros.com',
text: 'Click this link to activate your account on superheros.com'
}
q.addJob(job).catch((err) => {
console.error(err)
})
// Queue.process can be placed before or after adding jobs
q.process((job, next) => {
mailgun.messages().send(job.data, function (err, body) {
if (err && err.message === 'something really bad') {
err.cancelJob = 'something really bad happened'
}
if (err) { next(err) }
console.log(body)
next('email sent successfully')
})
}).catch((err) => {
console.error(err)
})- Introduction
- Tutorial
- Queue Constructor
- Queue Connection
- Queue Options
- Queue PubSub
- Queue Master
- Queue Events
- State Document
- Job Processing
- Job Options
- Job Status
- Job Retry
- Job Repeat
- Job Logging
- Job Editing
- Job Schema
- Job Name
- Complex Job
- Delayed Job
- Cancel Job
- Error Handling
- Queue.createJob
- Queue.addJob
- Queue.getJob
- Queue.findJob
- Queue.findJobByName
- Queue.containsJobByName
- Queue.cancelJob
- Queue.reanimateJob
- Queue.removeJob
- Queue.process
- Queue.review
- Queue.summary
- Queue.ready
- Queue.pause
- Queue.resume
- Queue.reset
- Queue.stop
- Queue.drop
- Queue.Job
- Queue.host
- Queue.port
- Queue.db
- Queue.name
- Queue.r
- Queue.id
- Queue.jobOptions [R/W]
- Queue.changeFeed
- Queue.master
- Queue.masterInterval
- Queue.removeFinishedJobs
- Queue.running
- Queue.concurrency [R/W]
- Queue.paused
- Queue.idle
- Event.ready
- Event.added
- Event.updated
- Event.active
- Event.processing
- Event.progress
- Event.log
- Event.pausing
- Event.paused
- Event.resumed
- Event.completed
- Event.cancelled
- Event.failed
- Event.terminated
- Event.reanimated
- Event.removed
- Event.idle
- Event.reset
- Event.error
- Event.reviewed
- Event.detached
- Event.stopping
- Event.stopped
- Event.dropped
- Job.setName
- Job.setPriority
- Job.setTimeout
- Job.setDateEnable
- Job.setRetryMax
- Job.setRetryDelay
- Job.setRepeat
- Job.setRepeatDelay
- Job.updateProgress
- Job.update
- Job.getCleanCopy
- Job.addLog
- Job.getLastLog