使用 Node.js 的 RabbitMQ RPC

Posted

技术标签:

【中文标题】使用 Node.js 的 RabbitMQ RPC【英文标题】:RabbitMQ RPC using Node.js 【发布时间】:2017-09-08 01:49:39 【问题描述】:

我使用 RabbitMQ 在 Node.js 中实现 RPC。

我按照教程,我为每个客户端声明相同的队列名称为'rpc_client'为了高权限,这里是client.js在服务器中调用函数:

const amqp = require('amqplib');

async function client()
    let args = process.argv.slice(2);
    let corr = generateUuid();
    let num = parseInt(args[0]);
    try 
        let conn = await amqp.connect('amqp://127.0.0.1');
        let ch = await conn.createChannel();
        let q = await ch.assertQueue('rpc_client');

        console.log(' [x] Requesting fib(%d)', num);
        console.log(q.queue)

        await ch.consume(q.queue,msg=>
            if (msg.properties.correlationId == corr) 
                console.log(' [.] Got %s', msg.content.toString());
                ch.ack(msg)
                setTimeout(function()  conn.close(); process.exit(0) , 5500);
            
        ,noAck:false);

        ch.sendToQueue('rpc_server',
        new Buffer(num.toString()),
         correlationId: corr, replyTo: q.queue );

     catch(err)
        console.error(err);
    


function generateUuid() 
    return Math.random().toString() +
           Math.random().toString() +
           Math.random().toString();


client();

但我发现当我一次运行多个客户端时,在前一个客户端的连接关闭之前,后一个客户端不会运行消费回调(从服务器获取答案并打印它)。例如第二个客户端将得到答案并打印它,它的连接将在 5500 毫秒内关闭,第二个客户端必须等待第一个客户端关闭并打印答案,然后再等待 5500 毫秒关闭。

那么为什么会这样呢?因为队列可以同时消费两个worker中的两个按摩。

这里是server.js

async function server()
    try 
        let conn = await amqp.connect('amqp://127.0.0.1');
        let ch = await conn.createChannel();
        process.once('SIGINT',()=>conn.close());

        let q = await ch.assertQueue('rpc_server');
        ch.prefetch(1);
        console.log(' [x] Awaiting RPC requests');

        await ch.consume(q.queue,msg=>
            let n = parseInt(msg.content.toString());

            console.log(" [.] fib(%d)", n);

            let r = fibonacci(n);

            ch.sendToQueue(msg.properties.replyTo,
            new Buffer(r.toString()),
            correlationId: msg.properties.correlationId);

            ch.ack(msg);
        ,noAck:false);


     catch(err) 
        console.error(err);
    


server();

function fibonacci (n , ac1 = 1 , ac2 = 1) 
    if( n <= 1 ) return ac2;

    return fibonacci (n - 1, ac2, ac1 + ac2);

【问题讨论】:

【参考方案1】:

如果队列名称相同,那么它就是同一个队列。不是两个同名的队列。在这种情况下,客户端按顺序而不是并行获取消息是有意义的。

所以尝试使用不同的队列名称,它应该可以工作。

【讨论】:

但是为什么同一个队列的服务器可以并行运行呢?参考rabbitmq.com/tutorials/tutorial-two-javascript.html 多个消费者订阅同一个队列。在这种情况下,他们将收到备用消息。就像奇数的会成为第一个消费者,然后偶数会成为第二个。此处提供有关此模式的更多详细信息:rabbitmq.com/tutorials/tutorial-two-javascript.html 对了,所以我的问题,有两个客户端,他们调用服务器中的函数,然后服务器会产生两个消息回rpc_client队列,所以两个客户端可以处理他们并行。但他们没有,为什么? @laoqiren 我也有同样的问题。你找到解决办法了吗?【参考方案2】:

据我所知,您必须将预取更改为大于 1 的数字。

basic.qos (prefetch) 方法允许你限制未确认的数量 消费时通道(或连接)上的消息。

所以问题出在这个参数上。您的频道将等待您的第一个请求得到处理和确认。

【讨论】:

以上是关于使用 Node.js 的 RabbitMQ RPC的主要内容,如果未能解决你的问题,请参考以下文章

node.js 中的二进制 RPC

node.js,Rabbitmq和Docker:使用seneca的服务似乎在rabbitmq之前启动

在Node.js中使用RabbitMQ系列一 Hello world

在Node.js中使用RabbitMQ系列二 任务队列

使用 amqplib for Node.JS 控制 RabbitMQ 消费者的消费率

通过 JSON RPC 与 SignalR/Node.JS 推送消息