Processing jobs is simple with Kue. First create a Queue
instance much like we do for creating jobs, providing us access to redis etc, then invoke jobs.process()
with the associated type.
Note that unlike what the name createQueue
suggests, it currently returns a singleton Queue
instance. So you can configure and use only a single Queue
object within your node.js process.
In the following example we pass the callback done
to email
, When an error occurs we invoke done(err)
to tell Kue something happened, otherwise we invoke done()
only when the job is complete. If this function responds with an error it will be displayed in the UI and the job will be marked as a failure.
var kue = require('kue')
, jobs = kue.createQueue();
jobs.process('email', function(job, done){
email(job.data.to, done);
});
Workers can pass job result as the second parameter to done done(null,result)
to store that in Job.result
key. result
is also passed through complete
event handlers so that job producers can receive it if they like to.
By default a call to jobs.process()
will only accept one job at a time for processing. For small tasks like sending emails this is not ideal, so we may specify the maximum active jobs for this type by passing a number:
jobs.process('email', 20, function(job, done){
// ...
});
Workers can temporary pause and resume their activity. It is, after calling pause
they will receive no jobs in their process callback until resume
is called. pause
function gracefully shutdowns this worker, and uses the same internal functionality as shutdown
method in Graceful Shutdown.
jobs.process('email', function(job, done, ctx){
ctx.pause( function(err){
console.log("Worker is paused... ");
setTimeout( function(){ ctx.resume(); }, 10000 );
}, 5000);
});
For a "real" example, let's say we need to compile a PDF from numerous slides with node-canvas. Our job may consist of the following data, note that in general you should not store large data in the job it-self, it's better to store references like ids, pulling them in while processing.
jobs.create('slideshow pdf', {
title: user.name + "'s slideshow"
, slides: [...] // keys to data stored in redis, mongodb, or some other store
});
We can access this same arbitrary data within a separate process while processing, via the job.data
property. In the example we render each slide one-by-one, updating the job's log and process.
jobs.process('slideshow pdf', 5, function(job, done){
var slides = job.data.slides
, len = slides.length;
function next(i) {
var slide = slides[i]; // pretend we did a query on this slide id ;)
job.log('rendering %dx%d slide', slide.width, slide.height);
renderSlide(slide, function(err){
if (err) return done(err);
job.progress(i, len);
if (i == len) done()
else next(i + 1);
});
}
next(0);
});
As of Kue 0.7.0, a Queue#shutdown(fn, timeout)
is added which signals all workers to stop processing after their current active job is done. Workers will wait timeout
milliseconds for their active job's done to be called or mark the active job failed
with shutdown error reason. When all workers tell Kue they are stopped fn
is called.
var queue = require('kue').createQueue();
process.once( 'SIGTERM', function ( sig ) {
queue.shutdown(function(err) {
console.log( 'Kue is shut down.', err||'' );
process.exit( 0 );
}, 5000 );
});