NodeJS|Cluster:如何将数据从主服务器发送到所有或单个孩子/工人?

Posted

技术标签:

【中文标题】NodeJS|Cluster:如何将数据从主服务器发送到所有或单个孩子/工人?【英文标题】:NodeJS|Cluster: How to send data from master to all or single child/workers? 【发布时间】:2012-01-21 23:56:04 【问题描述】:

我有来自node 的工作(库存)脚本

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;

if (cluster.isMaster) 
  // Fork workers.
  for (var i = 0; i < 2; i++) 
    var worker = cluster.fork();

    worker.on('message', function(msg) 
      if (msg.cmd && msg.cmd == 'notifyRequest') 
        numReqs++;
      
    );
  

  setInterval(function() 
    console.log("numReqs =", numReqs);
  , 1000);
 else 
  // Worker processes have a http server.
  http.Server(function(req, res) 
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send( cmd: 'notifyRequest' );
  ).listen(8000);

在上面的脚本中,我可以轻松地将数据从工作进程发送到主进程。但是如何将数据从 master 发送到 worker/workers?如果可能的话,举个例子。

【问题讨论】:

【参考方案1】:

因为 cluster.fork 是在 child_process.fork 之上实现的,所以您可以使用 worker.send( msg: 'test' ) 从 master 向 worker 发送消息,通过 process.send( msg: 'test' ); 从 worker 向 master 发送消息。你会收到这样的消息:worker.on('message', callback)(从worker到master)和process.on('message', callback);(从master到worker)。

这是我的完整示例,你可以浏览http://localhost:8000/进行测试,然后worker会向master发送消息,master会回复:

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var worker;

if (cluster.isMaster) 
  // Fork workers.
  for (var i = 0; i < 2; i++) 
    worker = cluster.fork();

    worker.on('message', function(msg) 
      // we only want to intercept messages that have a chat property
      if (msg.chat) 
        console.log('Worker to master: ', msg.chat);
        worker.send( chat: 'Ok worker, Master got the message! Over and out!' );
      
    );

  
 else 
  process.on('message', function(msg) 
    // we only want to intercept messages that have a chat property
    if (msg.chat) 
      console.log('Master to worker: ', msg.chat);
    
  );
  // Worker processes have a http server.
  http.Server(function(req, res) 
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send( chat: 'Hey master, I got a new request!' );
  ).listen(8000);

【讨论】:

其实我想为 (web/flash)socket 客户端创建推送服务器。当前版本堆叠在 1000 个同时连接上。所以我决定用socket.io监听器创建几个worker。这意味着我需要以异步方式将数据传递给工作人员。 听起来不错,请确保将 Socket.IO 与 RedisStore 一起使用。 这行不通。 varfor 里面? worker 将保留最后一个工人分叉,而不是每个工人(特别是在事件回调中)。要么你不关心所有的,你只是把你的回调封装起来,或者你把所有的工人都放在数组中。 对不起,我复制了他的一部分代码。我不会把我所有的工人都放在一个数组中,因为我只是想证明它的功能。 @alessioalex 通过工作人员发送数据会比使用 redis 更慢还是更快?【参考方案2】:

我在寻找一种向所有子进程发送消息的方法时发现了这个线程,幸好能够通过有关数组的 cmets 解决这个问题。只是想说明使用这种方法向所有子进程发送消息的潜在解决方案。

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var workers = [];

if (cluster.isMaster) 
  // Broadcast a message to all workers
  var broadcast = function() 
    for (var i in workers) 
      var worker = workers[i];
      worker.send( cmd: 'broadcast', numReqs: numReqs );
    
  

  // Fork workers.
  for (var i = 0; i < 2; i++) 
    var worker = cluster.fork();

    worker.on('message', function(msg) 
      if (msg.cmd) 
        switch (msg.cmd) 
          case 'notifyRequest':
            numReqs++;
          break;
          case 'broadcast':
            broadcast();
          break;
        
    );

    // Add the worker to an array of known workers
    workers.push(worker);
  

  setInterval(function() 
    console.log("numReqs =", numReqs);
  , 1000);
 else 
  // React to messages received from master
  process.on('message', function(msg) 
    switch(msg.cmd) 
      case 'broadcast':
        if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs);
      break;
    
  );

  // Worker processes have a http server.
  http.Server(function(req, res) 
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send( cmd: 'notifyRequest' );
    process.send( cmd: 'broadcast' );
  ).listen(8000);

【讨论】:

【参考方案3】:

以下是我针对类似问题实施解决方案的方法。通过挂钩cluster.on('fork'),您可以在工人被分叉时将消息处理程序附加到工人(而不是将它们存储在数组中),这具有处理工人死亡或断开连接以及新工人被分叉的情况的额外优势。

这个 sn-p 会从 master 向 所有 worker 发送消息。

if (cluster.isMaster) 
    for (var i = 0; i < require('os').cpus.length; i++) 
        cluster.fork();
    

    cluster.on('disconnect', function(worker) 
        cluster.fork();
    

    // When a new worker process is forked, attach the handler
    // This handles cases where new worker processes are forked
    // on disconnect/exit, as above.
    cluster.on('fork', function(worker) 
        worker.on('message', messageRelay);
    

    var messageRelay = function(msg) 
        Object.keys(cluster.workers).forEach(function(id) 
            cluster.workers[id].send(msg);
        );
    ;

else 
    process.on('message', messageHandler);

    var messageHandler = function messageHandler(msg) 
        // Worker received message--do something
    ;

【讨论】:

【参考方案4】:

我了解您向集群中的所有节点工作进程广播的目的,尽管您不能像这样发送套接字组件,但可以解决此目的。我将尝试用一个例子来解释:

第 1 步:当客户端操作需要广播时:

Child.js (Process that has been forked) :

socket.on("BROADCAST_TO_ALL_WORKERS", function (data) 

    process.send(cmd : 'BROADCAST_TO_ALL_WORKERS', message :data.message);
) 

第 2 步:在集群创建方面

Server.js (Place where cluster forking happens):

if (cluster.isMaster) 

  for (var i = 0; i < numCPUs; i++) 

    var worker = cluster.fork();

    worker.on('message', function (data) 
     if (data.cmd === "BROADCAST_TO_ALL_WORKERS") 
       console.log(server_debug_prefix() + "Server Broadcast To All, Message : " + data.message + " , Reload : " + data.reload + " Player Id : " + data.player_id);
        Object.keys(cluster.workers).forEach(function(id) 
            cluster.workers[id].send(cmd : "BROADCAST_TO_WORKER", message : data.message);
        );
      
    );
  

  cluster.on('exit', function (worker, code, signal) 
    var newWorker = cluster.fork();
    newWorker.on('message', function (data) 
      console.log(data);
      if (data.cmd === "BROADCAST_TO_ALL_WORKERS") 
        console.log(data.cmd,data);
        Object.keys(cluster.workers).forEach(function(id) 
            cluster.workers[id].send(cmd : "BROADCAST_TO_WORKER", message : data.message);
        );
      
    );
  );
 
else 
  //Node Js App Entry
  require("./Child.js");

第 3 步:在子进程中广播 -

-> 把这个放在 Child.js 中的 io.on("connection") 之前

process.on("message", function(data)
    if(data.cmd === "BROADCAST_TO_WORKER")
        io.sockets.emit("SERVER_MESSAGE",  message: data.message, reload: data.reload, player_id : data.player_id );
    
);

我希望这会有所帮助。如果需要进一步说明,请告诉我。

【讨论】:

【参考方案5】:

您应该能够像这样从 master 向 worker 发送消息:

worker.send(message:'hello')

因为“cluster.fork 是在 child_process.fork 之上实现的”(cluster.fork 是在 child_process.fork 之上实现的)

【讨论】:

是的,它有效,谢谢!换句话说:在分叉工人时,我应该将它们存储在一个数组中。并迭代这个数组以便将数据发送给每个孩子。是否有任何其他方式无需存储和迭代即可将数据发送给所有工作人员。 如果您不想将工作人员存储到数组中并遍历它们以发送消息,您可以使用 unix 域套接字将消息从主服务器传递给工作人员。 我想你可以在 master 中创建一个 EventEmitter,只要收到消息,它们就会发出一个事件。在创建每个worker之后,您只需向EventEmitter添加一个监听器,它将向worker发送消息。当然,这仍然是实现将侦听器的引用(也包括工作人员)存储到 EventEmitter 中,但是嘿,至少你不必看它【参考方案6】:

如果您只需要为您的子进程发送简单的配置数据,您可以使用cluster.fork() 发送环境变量。这很有用,并且有利于通过集群和进程send 方法发送消息。

const cluster = require('cluster')

if (cluster.isMaster) 
  cluster.fork(
    MY_DATA: 'something here'
  )
 else 
  console.log(process.env.MY_DATA) // "something here"

【讨论】:

以上是关于NodeJS|Cluster:如何将数据从主服务器发送到所有或单个孩子/工人?的主要内容,如果未能解决你的问题,请参考以下文章

NodeJS Cluster如何在工作人员之间共享对象数组

nodejs实现多进程(cluster 模式)

Nodejs cluster模块深入探究

Nodejs集群仅将任务分配给一个工作者(任何)

XAMPP - 如何从主目录执行 PHP 文件?

如何以编程方式确定从主功能区创建的 Outlook 新项目 --> 新项目