如何使用 nodejs 将消息发送到 RabbitMQ 队列

Posted

技术标签:

【中文标题】如何使用 nodejs 将消息发送到 RabbitMQ 队列【英文标题】:How do I send messages to RabbitMQ Queue, using nodejs 【发布时间】:2021-09-26 00:19:58 【问题描述】:

我目前正在构建一个负载平衡工具,以对服务器如何处理 http 协议请求进行不同的压力测量。目前,我只能发送一吨,但我无法全部处理。这就是为什么我希望使用 AMQP 将其全部放入像 rabbit 一样的消息队列中。

我当前的代码https://github.com/yugely/Stork

我目前正在使用事件循环和超时来充分完成我打算完成的工作。

我想通过使用我当前的记录器之一来使用 RabbitMQ,将消息“发送”到消息队列中。我不知道如何对其进行模块化,因此我不必不断创建频道,因为所有教程似乎只是相互复制粘贴,而不涉及如何在外部文件中使用它。

我希望有人能把我引向我可能在欺骗的东西。我什至不确定如何问这个问题。我一直在研究 RabbitMQ 和 AMQP 来处理项目的消息队列。我面临的问题是我不知道如何向rabbit mq 发送消息。让我通过在rabbitmq网站上复制第一个教程来说明我的理解:

发送.js

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) 
    if (error0) 
        throw error0;
    
    connection.createChannel(function(error1, channel) 
        if (error1) 
            throw error1;
        

        var queue = 'hello';
        var msg = 'Hello World!';

        channel.assertQueue(queue, 
            durable: false
        );
        /*
         How do I do this outside the functions so receive gets it? How can I call a method/function to send a message to an existing queue through an existing channel?
        */
        channel.sendToQueue(queue, Buffer.from(msg));

        console.log(" [x] Sent %s", msg);
    );
    setTimeout(function() 
        connection.close();
        process.exit(0);
    , 500);
);

receive.js

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) 
    if (error0) 
        throw error0;
    
    connection.createChannel(function(error1, channel) 
        if (error1) 
            throw error1;
        

        var queue = 'hello';

        channel.assertQueue(queue, 
            durable: false
        );

        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
        /*
    How do I set up a consumer outside this function? 
         */
        channel.consume(queue, function(msg) 
            console.log(" [x] Received %s", msg.content.toString());
        , 
            noAck: true
        );
    );
);

    对于发件人,我是否总是为每条消息创建一个新频道?

    当您运行“node”终端命令时,所有教程都接受参数。我目前看到这个工作的唯一方法是使用“子进程”库,但这将是坏消息,对吧?那不会创建另一个nodejs集群吗?

    如果我必须使用客户端发送消息,我可以使用 axios 吗? (我见过一些人声称他们能够,但我不确定)。既然会使用amqp协议,那么amqp客户端是什么?

    或者这些队列是在入口文件中实例化的吗?就像您在运行入口点命令时设置队列,然后允许不同的“事件”向队列发送消息?

如何模块化?

只是为了说明,这里是我当前的 axios 代码

RadioFlyer.js

    await axios(flight.journal.actions[modkey]).then(function (response) 
        reaction.key.type = messageType.HTTPResponse.Okay
        reaction.message = response === undefined ?  response : "no response"
        let smaug = typeof reaction.message.status === "undefined" ?  reaction.message : reaction.message.status
        flight.journal.reactions.push(reaction)
        let pulse = 
            id: flight.id + "-" + index,
                timestamp: Date.now(),
            body: 
            payload: 
                protocol : reaction.protocol,
                    type: messageType.HTTPResponse.Okay,
                    url: flight.journal.actions[modkey].baseURL,
                    status: smaug
                
            
        
       /*
        Can I emit a messaging event to my logger
        */
        //emit
        seidCar.HTTPLogger.emit("logHttp", reaction)
        //emit
        seidCar.HeartbeatLogger.emit("pulse", pulse)
    ).catch(function (error) 
      reaction.key.type = messageType.HTTPResponse.Error
      reaction.message = error.response === undefined ?  error.code : error.response

      let smaug = typeof reaction.message.status === "undefined" ?  reaction.message : reaction.message.status
      let pulse = 
            id: flight.id + "-" + index,
            timestamp: Date.now(),
            body: 
                payload: 
                    protocol : reaction.protocol,
                    type: messageType.HTTPResponse.Error,
                    url: flight.journal.actions[modkey].baseURL,
                    status: smaug
                
            
        
      let err = 
          id: flight.id+"-"+index+"-ERROR",
          timestamp : Date.now(),
          fatal : false,
          potentialFix : "Examine Http Call with id: " + flight.id + "-" + index,
          body: 
              payload: 
                  protocol : reaction.protocol,
                  type: messageType.HTTPResponse.Error,
                  url: flight.journal.actions[modkey].baseURL,
                  status: smaug
              
          
      
      flight.journal.reactions.push(reaction)
      //emit
      seidCar.HTTPLogger.emit("logHttp", reaction)
      //emit
      seidCar.ErrorLogger.emit("logError", err)
        //emit
        seidCar.HeartbeatLogger.emit("pulse", pulse)
    )

让我的记录器处理程序发送到队列?

HTTPLogger.js


/*
Can I now send the message to the queue here, in this file?
*/
const HTTPEventLogger = require("events")
const emitter = new HTTPEventLogger()
const errorEmitter = require("./ErrorLogger").Emitter

class HTTPLogger extends HTTPEventLogger 
  logHttp(message) 
      switch (message.key.type) 
        case "ERROR":
          if (message !== undefined) 
            console.log(message)
           else 
            errorEmitter.emit("tossIt", 
              error:"HTTP error Message is undefined in ~/Homestasis/Agent/HTTPLoggerjs.",
              poi:"check for recent additions to pilot agents in ~/Pilots/Agent",
              timestamp: Date.now(),
              potentialFix:"look to where you are emitting the message. Function Scope"
            )
          
          break;
        case "OKAY":
          if (message !== undefined) 
            console.log(message)//bState.message.status)
           else 
            errorEmitter.emit("tossIt", 
              error:"HTTP okay Message is undefined in ~/Homestasis/Agent/HTTPLoggerjs.",
              poi:"check for recent additions to pilot agents in ~/Pilots/Agent",
              timestamp: Date.now(),
              potentialFix:"look to where you are emitting the message. Function Scope"
            )
          
          break;
        default:
          errorEmitter.emit("tossIt", "this is a tossIt error log. No http key type was caught bSate = " + bState)
      
  

var logger = new HTTPLogger()
emitter.on("logHttp",logger.logHttp)

exports.Emitter = emitter

这就是我想发送消息的方式。

我希望能够以同样的方式接收它

鉴于我认为它目前的工作方式,我不确定如何实现它,而且我错过了等式的关键部分。感谢您的宝贵时间!

【问题讨论】:

这个范围很广。 1) 你可以,但不确定你为什么会这样做;像任何其他 JS 异步编程一样,您可以通过多种方式公开通道。 2)我不知道你在问什么。 3)不确定你在问什么;从 Node 与 Rabbit 交谈时,您已经在使用 AMQP 库。您是在谈论直接来自浏览器的请求吗? 4) 不知道你在问什么。 javascript 假设我有一个发送者类。目前,我正在构建一个事件驱动的负载平衡工具,以便发出事件,并且我希望能够在不同的队列中发送不同的消息。所以下面说明了一个我可以在不同地方使用的“发送者”类。问题是,对于1,我只是每次都打开一个频道吗?如果是这样,这会变得更容易,我可以从这个类中调用一个方法。这就是我最想问的:javascript var amqp = require('amqplib/callback_api'); class Sender sendMessage(msg) //create connection, channel and send message through buffer 请编辑问题以添加更多信息。创建连接相对昂贵,关闭连接会关闭所有关联的通道。目前尚不清楚您认为自己遇到了什么问题。 @DaveNewton 感谢您允许我添加内容。我对这一切仍然很不满。 【参考方案1】:

我决定不使用 RabbitMQ。

对于节点,我要使用 RSMQ。我意识到我从根本上需要改变我看待消息队列实际工作的方式,以使它们实际工作。我正在使用 RSMQ,因为我可以理解它如何适合我构建项目的方式。起初我使用的是 ZeroMQ(我仍然可以,类固醇上的套接字的柔韧性)。但是,当我真正考虑像“主线神经”一样使用它时,我确实喜欢它的实际功能和东西。

我想构建它的方式是这样的:

Axios 进行调用,触发一个事件到 httplogger,它触发一个事件到消息队列。所以:

RadioFlyer.js > HttpLogger.js > RSMQ-Messagequeue.js > 接下来的事情 > 和下一个...

RSMQ-Messagequeue.js 充当“中间人”或中间人,在超载时停止车轮。

RSMQ 的优点是我还打算实现一个 Redis 缓存(我可以实现 ZeroMQ 并做很多相同的事情),但我喜欢 RSMQ 允许更多的队列选项,而不是 1 个包含许多主题的队列。

还有一个更充实的答案。

【讨论】:

以上是关于如何使用 nodejs 将消息发送到 RabbitMQ 队列的主要内容,如果未能解决你的问题,请参考以下文章

Rabbit MQ 消息确认 ACK 解析,消息确认详解 Rabbit MQ

Rabbit-MQ 2

logback整合rabbit

Rabbit-MQ常见问题摘要

nodejs中使用RabbitMq消息中心系统的方式

rabbit exchange routing queue