Node下RabbitMQ的使用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Node下RabbitMQ的使用相关的知识,希望对你有一定的参考价值。

参考技术A

github

Message acknowledgment 消息回执

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。

Message durability 消息持久化

将队列中的消息进行本地持久化存储,避免因为意外原因导致丢失的大部分消息,通过设置 durable: true

Prefetch count 消息处理树

通过设置每一个消费者处理消息的数量,如果没有完成确认,就不再派发消息给消费者

exchange 交换器

生产者并不直接将消息发送到对应队列中,而是先发送到exchange 交换器中,交换器再通过一定的规则分发给一个或多个队列。交换器有四种类型:

routing key 路由key(生产者定义)

生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。
RabbitMQ为routing key设定的长度限制为255 bytes。

routing key 作用于发送的消息上

Binding
Bindg将exchange和Queue联系起来

Bing作用于消息队列Queue上

Exchange Types 交换器的类型

RPC 远程过程调用
消息队列本身并不具备回调的功能,即发出一个消息后,生产者并不知道消费者返回的消息(能够知道是否消费,通过 ch.ack(msg) ),通过RPC能够返回消费者的消息。其原理在于新建一个replyQueue,消费者在之前订阅该队列

思考:在HTTP1.1的情况下,server 接收到前端响应提交消息,与接收到replyQueue的消息是两个独立的事件,没办法在前者的响应中加上后者返回的信息。因此只能通过ws协议实现推送。

P.S 参考资料 RabbitMQ基础概念详细介绍

使用 Node.js 的 RabbitMQ RPC

【中文标题】使用 Node.js 的 RabbitMQ RPC【英文标题】:RabbitMQ RPC using Node.js 【发布时间】:2017-09-08 01:49:39 【问题描述】:

我使用 RabbitMQ 在 Node.js 中实现 RPC。

我按照教程,我为每个客户端声明相同的队列名称为'rpc_client'为了高权限,这里是client.js在服务器中调用函数:

const amqp = require('amqplib');

async function client()
    let args = process.argv.slice(2);
    let corr = generateUuid();
    let num = parseInt(args[0]);
    try 
        let conn = await amqp.connect('amqp://127.0.0.1');
        let ch = await conn.createChannel();
        let q = await ch.assertQueue('rpc_client');

        console.log(' [x] Requesting fib(%d)', num);
        console.log(q.queue)

        await ch.consume(q.queue,msg=>
            if (msg.properties.correlationId == corr) 
                console.log(' [.] Got %s', msg.content.toString());
                ch.ack(msg)
                setTimeout(function()  conn.close(); process.exit(0) , 5500);
            
        ,noAck:false);

        ch.sendToQueue('rpc_server',
        new Buffer(num.toString()),
         correlationId: corr, replyTo: q.queue );

     catch(err)
        console.error(err);
    


function generateUuid() 
    return Math.random().toString() +
           Math.random().toString() +
           Math.random().toString();


client();

但我发现当我一次运行多个客户端时,在前一个客户端的连接关闭之前,后一个客户端不会运行消费回调(从服务器获取答案并打印它)。例如第二个客户端将得到答案并打印它,它的连接将在 5500 毫秒内关闭,第二个客户端必须等待第一个客户端关闭并打印答案,然后再等待 5500 毫秒关闭。

那么为什么会这样呢?因为队列可以同时消费两个worker中的两个按摩。

这里是server.js

async function server()
    try 
        let conn = await amqp.connect('amqp://127.0.0.1');
        let ch = await conn.createChannel();
        process.once('SIGINT',()=>conn.close());

        let q = await ch.assertQueue('rpc_server');
        ch.prefetch(1);
        console.log(' [x] Awaiting RPC requests');

        await ch.consume(q.queue,msg=>
            let n = parseInt(msg.content.toString());

            console.log(" [.] fib(%d)", n);

            let r = fibonacci(n);

            ch.sendToQueue(msg.properties.replyTo,
            new Buffer(r.toString()),
            correlationId: msg.properties.correlationId);

            ch.ack(msg);
        ,noAck:false);


     catch(err) 
        console.error(err);
    


server();

function fibonacci (n , ac1 = 1 , ac2 = 1) 
    if( n <= 1 ) return ac2;

    return fibonacci (n - 1, ac2, ac1 + ac2);

【问题讨论】:

【参考方案1】:

如果队列名称相同,那么它就是同一个队列。不是两个同名的队列。在这种情况下,客户端按顺序而不是并行获取消息是有意义的。

所以尝试使用不同的队列名称,它应该可以工作。

【讨论】:

但是为什么同一个队列的服务器可以并行运行呢?参考rabbitmq.com/tutorials/tutorial-two-javascript.html 多个消费者订阅同一个队列。在这种情况下,他们将收到备用消息。就像奇数的会成为第一个消费者,然后偶数会成为第二个。此处提供有关此模式的更多详细信息:rabbitmq.com/tutorials/tutorial-two-javascript.html 对了,所以我的问题,有两个客户端,他们调用服务器中的函数,然后服务器会产生两个消息回rpc_client队列,所以两个客户端可以处理他们并行。但他们没有,为什么? @laoqiren 我也有同样的问题。你找到解决办法了吗?【参考方案2】:

据我所知,您必须将预取更改为大于 1 的数字。

basic.qos (prefetch) 方法允许你限制未确认的数量 消费时通道(或连接)上的消息。

所以问题出在这个参数上。您的频道将等待您的第一个请求得到处理和确认。

【讨论】:

以上是关于Node下RabbitMQ的使用的主要内容,如果未能解决你的问题,请参考以下文章

Rabbitmq 服务启动后停止 | Windows下启动报错Error: unable to connect to node rabbit@xxx: nodedown

node.js,Rabbitmq和Docker:使用seneca的服务似乎在rabbitmq之前启动

CentOS 7 下 RabbitMQ 集群搭建

Linux下rabbitmq的集群搭建

在Node.js中使用RabbitMQ系列一 Hello world

Windows下安装RabbitMQ报错:unable to perform an operation on node时的解决方案