nodejs操作消息队列RabbitMQ

Posted ly570

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nodejs操作消息队列RabbitMQ相关的知识,希望对你有一定的参考价值。

一. 什么是消息队列

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信。

为什么会产生消息队列?有几个原因:

不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

二. 常用的消息队列

RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq

三. 使用场景

异步处理

应用解耦

流量削峰

四 使用amqplib操作RabbitMQ

安装 amqplib

npm install amqplib
生产者:

let amqp = require(‘amqplib‘);

class RabbitMQ
constructor()
this.hosts = [];
this.index = 0;
this.length = this.hosts.length;
this.open = amqp.connect(this.hosts[this.index]);

sendQueueMsg(queueName, msg, errCallBack)
let self = this;

self.open
.then(function (conn)
return conn.createChannel();
)
.then(function (channel)
return channel.assertQueue(queueName).then(function (ok)
return channel.sendToQueue(queueName, new Buffer(msg),
persistent: true
);
)
.then(function (data)
if (data)
errCallBack && errCallBack("success");
channel.close();

)
.catch(function ()
setTimeout(() =>
if (channel)
channel.close();

, 500)
);
)
.catch(function ()
let num = self.index++;

if (num <= self.length - 1)
self.open = amqp.connect(self.hosts[num]);
else
self.index == 0;

);



let mq = new RabbitMQ();
mq.sendQueueMsg(‘testQueue‘, ‘123‘, (error) =>
console.log(error)
)
消费者

let amqp = require(‘amqplib‘);

class RabbitMQ
constructor()
this.hosts = [];
this.index = 0;
this.length = this.hosts.length;
this.open = amqp.connect(this.hosts[this.index]);


receiveQueueMsg(queueName, receiveCallBack, errCallBack)
let self = this;

self.open
.then(function (conn)
return conn.createChannel();
)
.then(function (channel)
return channel.assertQueue(queueName)
.then(function (ok)
return channel.consume(queueName, function (msg)
if (msg !== null)
let data = msg.content.toString();
channel.ack(msg);
receiveCallBack && receiveCallBack(data);

)
.finally(function ()
setTimeout(() =>
if (channel)
channel.close();

, 500)
);
)
)
.catch(function ()
let num = self.index++;
if (num <= self.length - 1)
self.open = amqp.connect(self.hosts[num]);
else
self.index = 0;
self.open = amqp.connect(self.hosts[0]);

);



let mq = new RabbitMQ();
mq.receiveQueueMsg(‘testQueue‘,(msg) =>

console.log(msg)//123
)
打开mq后台 http://127.0.0.1:15672/ 看到新增队列,接受一条消息

 

 

当运行消费者代码时输入 123,消息队列消息为0

 

 


---------------------

以上是关于nodejs操作消息队列RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 nodejs 将消息发送到 RabbitMQ 队列

rabbitmq消息队列介绍

RabbitMQ介绍 + python操作

nodejs中使用RabbitMq消息中心系统的方式

JAVA03_21学习总结(RabbitMQ消息队列)

专题┃RabbitMQ实战:高效部署分布式消息队列(内含系统操作程序设计网络安全书单)