Kue.JS手册

处理作业

Processing jobs is simple with Kue. First create a Queue instance much like we do for creating jobs, providing us access to redis etc, then invoke jobs.process() with the associated type. Note that unlike what the name createQueue suggests, it currently returns a singleton Queue instance. So you can configure and use only a single Queue object within your node.js process.

In the following example we pass the callback done to email, When an error occurs we invoke done(err) to tell Kue something happened, otherwise we invoke done() only when the job is complete. If this function responds with an error it will be displayed in the UI and the job will be marked as a failure.

var kue = require('kue')
 , jobs = kue.createQueue();

jobs.process('email', function(job, done){
  email(job.data.to, done);
});

Workers can pass job result as the second parameter to done done(null,result) to store that in Job.result key. result is also passed through complete event handlers so that job producers can receive it if they like to.

处理并发

By default a call to jobs.process() will only accept one job at a time for processing. For small tasks like sending emails this is not ideal, so we may specify the maximum active jobs for this type by passing a number:

jobs.process('email', 20, function(job, done){
  // ...
});

暂停处理

Workers can temporary pause and resume their activity. It is, after calling pause they will receive no jobs in their process callback until resume is called. pause function gracefully shutdowns this worker, and uses the same internal functionality as shutdown method in Graceful Shutdown.

jobs.process('email', function(job, done, ctx){
  ctx.pause( function(err){
    console.log("Worker is paused... ");
    setTimeout( function(){ ctx.resume(); }, 10000 );
  }, 5000);
});

更新进度

For a "real" example, let's say we need to compile a PDF from numerous slides with node-canvas. Our job may consist of the following data, note that in general you should not store large data in the job it-self, it's better to store references like ids, pulling them in while processing.

jobs.create('slideshow pdf', {
    title: user.name + "'s slideshow"
  , slides: [...] // keys to data stored in redis, mongodb, or some other store
});

We can access this same arbitrary data within a separate process while processing, via the job.data property. In the example we render each slide one-by-one, updating the job's log and process.

jobs.process('slideshow pdf', 5, function(job, done){
  var slides = job.data.slides
    , len = slides.length;

  function next(i) {
    var slide = slides[i]; // pretend we did a query on this slide id ;)
    job.log('rendering %dx%d slide', slide.width, slide.height);
    renderSlide(slide, function(err){
      if (err) return done(err);
      job.progress(i, len);
      if (i == len) done()
      else next(i + 1);
    });
  }

  next(0);
});

正常关机

As of Kue 0.7.0, a Queue#shutdown(fn, timeout) is added which signals all workers to stop processing after their current active job is done. Workers will wait timeout milliseconds for their active job's done to be called or mark the active job failed with shutdown error reason. When all workers tell Kue they are stopped fn is called.

var queue = require('kue').createQueue();

process.once( 'SIGTERM', function ( sig ) {
  queue.shutdown(function(err) {
    console.log( 'Kue is shut down.', err||'' );
    process.exit( 0 );
  }, 5000 );
});