作业完成时的 Kue 回调

Posted

技术标签:

【中文标题】作业完成时的 Kue 回调【英文标题】:Kue callback when job is completed 【发布时间】:2013-02-19 05:54:10 【问题描述】:

我的主节点实例派生出一个工作进程,该进程通过 IPC(使用内置节点 process.send()process.on('message'...)接受消息,这些消息是包含有关要添加到 Kue 的新作业信息的对象。然后它会处理这些作业。

我的主节点实例调用如下:

worker.send(jobType:'filesystem', operation: 'delete', path: fileDir);

worker 实例执行如下操作:

jobs.create(message.jobType, message).save();

jobs.process('filesystem', function(job, done) 
    fs.delete(job.data.path, function(err) 
        done(err);
    );
);

作业成功完成。

作业完成后,如何在我的主节点实例中获得类似回调的功能?如何从工作实例返回一些结果到主节点实例?

【问题讨论】:

【参考方案1】:

我相信我解决了这个问题,但我会留下未解决的问题,以防有人可以改进我的解决方案或提供更好的解决方案。

当您使用 Kue 在单独的进程中处理作业时,您不能简单地在作业完成时执行回调。这是两个进程之间通信的示例。我本来希望使用 Kue 自动提供每个作业的 id(我相信它与 Redis 中接收到的 id 相同)但是 app.js 需要在将作业的 id 发送给工作人员之前知道它收到消息时可以匹配id。

app.js

var child = require('child_process');
var async = require('async');

var worker = child.fork("./worker.js");

//When a message is received, search activeJobs for it, call finished callback, and delete the job
worker.on('message', function(m) 
    for(var i = 0; i < activeJobs.length; i++) 
        if(m.jobId == activeJobs[i].jobId) 
            activeJobs[i].finished(m.err, m.results);
            activeJobs.splice(i,1);
            break;
        
    
);

// local job system
var newJobId = 0;
var activeJobs = [];

function Job(input, callback) 
    this.jobId = newJobId;
    input.jobId = newJobId;
    newJobId++;
    activeJobs.push(this);

    worker.send(input);

    this.finished = function(err, results) 
        callback(err, results);
    



var deleteIt = function(req, res) 
    async.series([
        function(callback) 
            // An *EXAMPLE* asynchronous task that is passed off to the worker to be processed
            // and requires a callback (because of async.series)
            new Job(
                jobType:'filesystem',
                title:'delete project directory',
                operation: 'delete',
                path: '/deleteMe'
            , function(err) 
                callback(err);
            );
        ,
        //Delete it from the database
        function(callback) 
            someObject.remove(function(err) 
                callback(err);
            );
        ,
    ],
    function(err) 
        if(err) console.log(err);
    );
;

worker.js

var kue = require('kue');
var fs = require('fs-extra');

var jobs = kue.createQueue();

//Jobs that are sent arrive here
process.on('message', function(message) 
    if(message.jobType) 
        var job = jobs.create(message.jobType, message).save();
     else 
        console.error("Worker:".cyan + " [ERROR] No jobType specified, message ignored".red);
    
);

jobs.process('filesystem', function(job, done) 

    if(job.data.operation == 'delete') 
        fs.delete(job.data.path, function(err) 
            notifyFinished(job.data.jobId, err);
            done(err);
        );
    
);

function notifyFinished(id, error, results) 
    process.send(jobId: id, status: 'finished', error: error, results: results);

https://gist.github.com/winduptoy/4991718

【讨论】:

谢谢 - 我遇到了同样的问题。如果有更简单的解决方案就好了。

以上是关于作业完成时的 Kue 回调的主要内容,如果未能解决你的问题,请参考以下文章

Node.js Kue 如何重启失败的作业

完成 fs.writeFilesync 时的回调

Angular 2:ngFor 完成时的回调

查找多个回调何时完成时的编程模型

如何在解析服务器上使用 kue 安排作业?

javascript 模拟kue获取所有创建和处理的作业