使用 Node.js 的 RabbitMQ RPC
Posted
技术标签:
【中文标题】使用 Node.js 的 RabbitMQ RPC【英文标题】:RabbitMQ RPC using Node.js 【发布时间】:2017-09-08 01:49:39 【问题描述】:我使用 RabbitMQ 在 Node.js 中实现 RPC。
我按照教程,我为每个客户端声明相同的队列名称为'rpc_client'为了高权限,这里是client.js
在服务器中调用函数:
const amqp = require('amqplib');
async function client()
let args = process.argv.slice(2);
let corr = generateUuid();
let num = parseInt(args[0]);
try
let conn = await amqp.connect('amqp://127.0.0.1');
let ch = await conn.createChannel();
let q = await ch.assertQueue('rpc_client');
console.log(' [x] Requesting fib(%d)', num);
console.log(q.queue)
await ch.consume(q.queue,msg=>
if (msg.properties.correlationId == corr)
console.log(' [.] Got %s', msg.content.toString());
ch.ack(msg)
setTimeout(function() conn.close(); process.exit(0) , 5500);
,noAck:false);
ch.sendToQueue('rpc_server',
new Buffer(num.toString()),
correlationId: corr, replyTo: q.queue );
catch(err)
console.error(err);
function generateUuid()
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
client();
但我发现当我一次运行多个客户端时,在前一个客户端的连接关闭之前,后一个客户端不会运行消费回调(从服务器获取答案并打印它)。例如第二个客户端将得到答案并打印它,它的连接将在 5500 毫秒内关闭,第二个客户端必须等待第一个客户端关闭并打印答案,然后再等待 5500 毫秒关闭。
那么为什么会这样呢?因为队列可以同时消费两个worker中的两个按摩。
这里是server.js
:
async function server()
try
let conn = await amqp.connect('amqp://127.0.0.1');
let ch = await conn.createChannel();
process.once('SIGINT',()=>conn.close());
let q = await ch.assertQueue('rpc_server');
ch.prefetch(1);
console.log(' [x] Awaiting RPC requests');
await ch.consume(q.queue,msg=>
let n = parseInt(msg.content.toString());
console.log(" [.] fib(%d)", n);
let r = fibonacci(n);
ch.sendToQueue(msg.properties.replyTo,
new Buffer(r.toString()),
correlationId: msg.properties.correlationId);
ch.ack(msg);
,noAck:false);
catch(err)
console.error(err);
server();
function fibonacci (n , ac1 = 1 , ac2 = 1)
if( n <= 1 ) return ac2;
return fibonacci (n - 1, ac2, ac1 + ac2);
【问题讨论】:
【参考方案1】:如果队列名称相同,那么它就是同一个队列。不是两个同名的队列。在这种情况下,客户端按顺序而不是并行获取消息是有意义的。
所以尝试使用不同的队列名称,它应该可以工作。
【讨论】:
但是为什么同一个队列的服务器可以并行运行呢?参考rabbitmq.com/tutorials/tutorial-two-javascript.html 多个消费者订阅同一个队列。在这种情况下,他们将收到备用消息。就像奇数的会成为第一个消费者,然后偶数会成为第二个。此处提供有关此模式的更多详细信息:rabbitmq.com/tutorials/tutorial-two-javascript.html 对了,所以我的问题,有两个客户端,他们调用服务器中的函数,然后服务器会产生两个消息回rpc_client
队列,所以两个客户端可以处理他们并行。但他们没有,为什么?
@laoqiren 我也有同样的问题。你找到解决办法了吗?【参考方案2】:
据我所知,您必须将预取更改为大于 1 的数字。
basic.qos (prefetch) 方法允许你限制未确认的数量 消费时通道(或连接)上的消息。
所以问题出在这个参数上。您的频道将等待您的第一个请求得到处理和确认。
【讨论】:
以上是关于使用 Node.js 的 RabbitMQ RPC的主要内容,如果未能解决你的问题,请参考以下文章
node.js,Rabbitmq和Docker:使用seneca的服务似乎在rabbitmq之前启动
在Node.js中使用RabbitMQ系列一 Hello world