RabbitMQ 超详细入门篇
Posted Java.慈祥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 超详细入门篇相关的知识,希望对你有一定的参考价值。
RabbitMQ 入门篇🚪
MQ 的基本概念:
什么是 MQ ?
MQ全称为Message Queue即消息队列
-
"消息队列"
是在消息的传输过程中保存消息的容器 -
它是典型的:生产者————消费者模型
生产者不断向消息队列中生产消息 ———————— 消费者不断的从队列中获取消息.
这样的好处: 生产者只需要关注发消息,消费者只需要关注收消息,二者没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦.
为什么要使用 MQ?
或者说MQ 有什么好处,MQ 主要可以实现三种功能:
服务解耦
-
场景:服务A产生数据, 而服务B,C,D需要这些数据
那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可;
-
但
随着我们的应用规模不断扩大,会有更多的服务需要A的数据
如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况 A服务中调用代码的维护会极为困难
程序非常的耦合
-
而 ,通过 MQ消息队列 可以实现,对程序的
解耦
A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据
下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可
流量削峰
-
场景:
我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对 √
但,在高峰期, 访问量瞬间翻了十倍, 达到每秒3000次请求, 单台服务器无法应对
我们增加到10台服务器,减压
-
而,很多时候这种高压 每天只出现一次,每次只有半小时
那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了
-
使用MQ来进行流量削峰
我们可以对于这种,可能会突然产生高请求的功能,设置一个MQ
当用户发起请求后台并不会立刻处理,而是通过 MQ 发送一个请求,发送到队列里面,排队等待处理…
我们的后台,接收者,发现队列中有消息,一个一个的取出,进行后台处理…
避免了同一时刻大量的请求,而处理不过来导致 服务崩溃~
异步调用
-
场景:
对于有些服务之间的调用会有很长时间的响应,而用户并不能接受这么时间的响应:
A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完给页面响应…
-
外卖支付
相信大家都点过外卖,用户支付完成之后,到真正外卖到手是一个很漫长复杂的过程~
我们不可能一直停留在页面上进行等待~
支付后————发送支付成功的通知————再寻找外卖小哥来进行配送…
而寻找外卖小哥的过程非常耗时,高峰期,可能要等待几十秒甚至更长,这样就造成整条调用链路响应非常缓慢
-
MQ解决方案:
用户下单,订单数据可以发送到消息队列服务器,立刻响应客户端
为您寻找骑手,整条链路的响应时间只有200毫秒左右
消息接收方,监听获取每一个订单消息后台缓慢的寻找外卖小哥~
AMQP 和 JMS ⭐
AMQP 和 JMS 是目前市面上常见的两种 消息队列协议
AMQP
-
AMQP
高级消息队列协议!
是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS
RabbitMQ 就是基于 AMQP 协议实现的
JMS
-
JMS
Java 消息服务
JMS的客户端之间可以通过
JMS服务
进行异步的消息传输 -
JMS(Java Message Service,Java消息服务)API是一个消息服务的标准或者说是规范
就像JDBC一样通过接口定义一组规范,不同的实现尝试实现对于的驱动来完成开发...
它使分布式通信耦合度更低,消息服务更加可靠以及异步性。 ActiveMQ 就是基于 JMS 规范实现的
总结:
规范:
-
AMQP 为消息定义了线路层(wire-level protocol)的协议
-
JMS所定义的是API规范
跨平台
- Java 体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差
- AMQP天然具
有跨平台、跨语言特性
支持消息类型
JMS
支持TextMessage、MapMessage 等复杂的消息类型AMQP
仅支持 byte[] 消息类型(复杂的类型可序列化后发送
Exchange 交换机
提供的路由算法
- AMQP可以提供多样化的路由方式来传递消息到消息队列 4种交换机类型,6种模式
- JMS 仅支持 队列 和 主题/订阅 方式两种
常见MQ产品:
- ActiveMQ:基于JMS,
早期的MQ框架,现在已经很少使用了
- Kafka:分布式消息系统,高吞吐量
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
本篇学习😶
- RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
RabbitMQ
-
RabbitMQ是由
erlang
语言开发,所以安装环境需要安装erlang
-
基于
AMQP
(Advanced Message Queue 高级消息队列协议
)协议实现的消息队列 -
它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛
RabbitMQ 的工作原理
组成部分:
Producer 消息生产者
- 消息生产者,即生产方客户端,生产方客户端将消息,
通过信道Channel
发送到MQ
Connection 连接对象
-
Producer /Consumer 和 broker 之间的 TCP 连接
程序通过,Connection连接对象将,创建出
Channel信道
:生产者通过 信道 将消息发送给MQ
消费者通过 信道 获取到MQ的消息~
-
Channel 信道:
如果每一次访问 RabbitMQ 都建立一个 Connection,消息量大的时候,对于性能也是巨大的;
Channel 是在 connection 内部建立的逻辑连接,为 Connection 减少了操作系统建立 TCP connection 的开销;
细节不详细介绍
可以理解为是一个,消息数据传递的一个
通到
可以通过它,来创建配置,
生产者|消费者 与MQ通信
声明设置绑定:交换机|队列
Broker 可以认为是 MQ
-
消息队列服务进程
:此进程包括两个部分:Exchange交换机
和Queue队列
-
Exchange交换机
是 RabbitMQ 非常重要的一个部件
一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中
-
Queue 队列
RabbitMQ 内部使用的一种数据结构
队列
队列就像是一个“吸管” 一边吸水一边出水,遵循 “先进先出”原则;
生产者发消息——交换机——转发到队列上
是真正消息存储的地方~
Consumer 消息消费者
- 消息消费者,即消费方客户端,
通关信道Channel
接收MQ转发的消息,并进行相关的处理;
-----发送消息-----
- 生产者通过 Connection 和Broker建立TCP连接。
- Connection 建立 Channel 通道
- 生产者通过信道,将消息发送给Broker(MQ),由Exchange将消息进行转发~
队列中去!
-----接收消息-----
- 消费者通过 Connection 和Broker建立TCP连接
- Connection 建立 Channel 通道
- 消费者监听指定的Queue(队列),当有消息到达Queue时Broker默认将消息,通过 Channel 推送给消费者
Exchange 交换机四种类型 ⭐
RabbitMQ消息传递模型的核心思想是:
-
生产者永远不会将任何消息直接发送到队列,通常生产者甚至不知道消息是否会被传递到任何队列 生产者只能向交换机(Exchange)发送消息
-
交换机是一个非常简单的东西,一边接收来自生产者的消息,另一边将消息推送到队列.
-
RabbitMQ 的交换机具有很多中类型,可以完成很多种复杂的场景操作:
交换机类型:
-
fanout: 广播模式
发布/订阅
,交换机给所有的队列,发送相同的消息; -
direct : 路由模式
routing key
交换机,根据对应的routing key
的队列上发送消息; -
topic: 动态路由模式,可以用过一定的规则定义
roting key
使 交换机动态的多样性选择队列
* 表示一个单词
# 表示任意数量(零个或多个)单词
-
headers: 请求头模式,
目前用的很少了
,就像请求头一样,发送消息时候附带头部数据
,交换机根据消息的头部信息匹配对应的队列;
RabbitMQ环境搭建 ✔
本次搭建是Linux 的 如果有朋友是Win的话可以参考这篇文章:🚀
工具准备🔨:
RabbitMQ是由erlang
语言开发,所以安装环境需要安装 erlang
- erlang-21.3.8.21-1.el7.x86_64.rpm
erlang环境
- rabbitmq-server-3.8.8-1.el7.noarch.rpm
rabbit安装
官网下载,如果没有的话也可以底部本人的网盘下载
环境搭建🏚:
本人使用的是 阿里云服务器
没有的话也可以使用虚拟机… 事先使用连接工具上传了文件
本人喜欢把工具都安装在 /usr/wsm
目录下:
[root@iZj6ciuzx7luldnazt4iswZ ~]# cd /
[root@iZj6ciuzx7luldnazt4iswZ /]# ls
bin dev home lib lost+found mnt patch root sbin sys usr www
boot etc install.sh lib64 media opt proc run srv tmp var
[root@iZj6ciuzx7luldnazt4iswZ /]# cd usr
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin etc games include lib lib64 libexec local sbin share src tmp
[root@iZj6ciuzx7luldnazt4iswZ usr]# mkdir wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin etc games include lib lib64 libexec local sbin share src tmp wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# cd wsm
[root@iZj6ciuzx7luldnazt4iswZ wsm]# ls
erlang-21.3-1.el7.x86_64.rpm rabbitmq-server-3.8.8-1.el7.noarch.rpm #上传的两个文件
解压安装:
# 解压安装 erlang
rpm -ivh erlang-21.3.8.21-1.el7.x86_64.rpm
# 云下载一个 初始化一些配置, 过程比较慢请耐心等待~, 在这之后才可以进行 安装 RabbitMQ
yum install socat -y
# 解压安装 rabbitmq
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
ok ,安装完毕了解一些 RabbitMQ 命令:
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
# 开机自启动
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 重启服务
systemctl restart rabbitmq-server
注意:这里只是把RabbitMQ 服务
给搭建好了,为了方便操作我们还需要安装一个web控制面板
# 安装web控制面板
rabbitmq-plugins enable rabbitmq_management
# 安装完毕以后,重启服务即可
systemctl restart rabbitmq-server
# 访问 http://服务器ip:15672 ,用默认账号密码(guest)登录,出现权限问题
# 默认情况只能在 localhost 本机下访问,所以需要添加一个远程登录的用户
# 创建账号和密码: admin 123456
rabbitmqctl add_user admin 123456
# 设置用户角色,用户级别: administrator monitoring policymaker managment
rabbitmqctl set_user_tags admin administrator
# 为用户添加资源权限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 添加配置、写、读权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
##### 扩展一些命令:#####
关闭应用的命令为: rabbitmqctl stop_app
清除的命令为: rabbitmqctl reset
重新启动命令为: rabbitmqctl start_app
- 如果是阿里云的服务器
别忘记开启端口 还有 关闭防火墙~
用户级别:
- administrator:可以登录控制台、查看所有信息、可以对 rabbitmq 进行管理
- monitoring:监控者 登录控制台,查看所有信息
- policymaker:策略制定者 登录控制台,指定策略
- managment:普通管理员 登录控制台
主要端口介绍:阿里云建议将这些都打开~
-
4369 – erlang发现口
-
5672 – client端通信口
-
15672 – 管理界面ui端口
-
25672 – server间内部通信口
测试是否可以访问:
访问页面:
-
Overview
概览 RabbitMQ 的整体情况,也可以查看
集群各个节点的信息 情况
MQ 各个端口映射信息
-
Connection
该 选项专栏 下是MQ 与各个
生产者
消费者
连接情况. -
Channels
这里展示,各个
通道 与 连接的关系
-
Exchanage
展示所有的 交换机
-
Queue
展示所有的 队列
-
Admin
这里管理着,MQ 所有的操作用户~
RabbitMQ 管理页面:
Overview
Connections
- Name 连接名
点击连接名, 还可以查看详细的信息~
- User name 当前连接登录MQ 的用户
- State 当前连接的状态,
running 运行
idle 空闲
- SSL|TLS 是否使用的是
SSL|TLS协议
- Peotocol
AMQP 0-9-1
指的是AMQP 的协议版本号 - Channels 当前连接创建通道的 通道总数
- From client 每秒发出的消息数
- To client 每秒接收的消息数
Channels
记录各个连接的信道:
一个连接IP 可以有多个信道
多个通道通过多线程实现,不相互干扰 我们在 信道中创建:队列 交换机 ...
生产者的通道一般使用完之后会立马关闭,消费者是一直监听的…
-
Channel 通道名称
-
User Name 该通道,创建者 用户名
-
Model 通道的
确认模式
C confirm模式
T 表示事务
-
State 通道当前的状态
running 运行
idie 空闲
-
Unconfirmed 待确认的消息数
-
Prefetch
预先载入
Prefetch 表示每个消费者最大的能承受的未确认消息数目
简单来说就是用来指定一个消费者一次可以从 RabbitMQ 中获取多少条消息并缓存在消费者中,
一旦消费者的缓冲区满了,RabbitMQ 将会停止投递新的消息到该消费者中直到它发出有消息被 ack 了
消费者负责不断处理消息,不断 ack,然后只要
UnAcked
数少于Prefetch * consumer
数目,RabbitMQ 就不断将消息投递过去 -
Unacker 待 ack 的消息数
-
publish 消息生产者发送消息的
速率
-
confirm 消息生产者确认消息的
速率
-
unroutable
drop
表示消息,未被接收,且已经删除的消息. -
deliver / get 消息消费者获取消息的
速率
-
ack 消息消费者 ack 消息的速率.
MQ 的 ACK机制:100%消息消费!
Exchange
Queue
- Name 表示消息队列的名称
- Type 消息队列的类型…
- Features:表示消息队列的特性,D 表示消息队列持久化
- State:表示当前队列的状态,running 表示运行中;idle 表示空闲
- Ready:表示待消费的消息总数
- Unacked:表示待应答的消息总数
- Total:表示消息总数 Ready+Unacked
- incoming:表示消息进入的速率
- deliver/get:表示获取消息的速率
- ack:表示消息应答的速
Admin
Java 集成 RabbitMQ 案例
创建一个Maven项目并使用 git 进行管理, wlog.md
文件进行着项目日志的记录✍~
引入RabbitMQ
的依赖:
pom.xml
<dependencies>
<!-- rabbitMQ 依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
</dependencies>
简单模式 Hello Word:
如图,显而易见,非常简单就是一个一发一读
的过程…
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
发送者
Producer.Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** 消息生产者 **/
public class Producer
// 定义队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.243.109.199");
factory.setUsername("admin");
factory.setPassword("123456");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
//创建连接对象
Connection connection = factory.newConnection();
//根据连接对象,获取信道
Channel channel = connection.createChannel();
/**设置消息队列的属性!
* queue :队列名称
* durable :是否持久化 如果持久化,mq重启后队列数据还在! (队列是在虚拟路径上的...)
* exclusive :队列是否独占此连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* autoDelete :队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* arguments :队列参数 null,可以设置一个队列的扩展参数,需要时候使用!比如:可设置存活时间
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**发送消息,参数:
* exchange :指定的交换机,不指定就会有默认的....
* routingKey :路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机routingKey设置为队列的名称
* props :消息包含的属性: 后面介绍,可以是一个一个对象... 消息持久化配置...
* body :发送的消息,AMQP以字节方式传输...
* */
channel.basicPublish("", QUEUE_NAME, null, "Hello Word你好世界".getBytes());
System.out.println("消息发送完毕");
消费者
Consumer.Java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** 消息消费者 **/
public class Consumer
// 定义队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.243.109.199");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息.........");
//收到消息后用来处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) ->
String message = new String(delivery.getBody());
System.out.println(message);
;
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) ->
System.out.println("消息消费被中断");
;
/**
* 消费者消费消息 - 接受消息
* queue 消费哪个队列
* autoAck 消费成功之后是否要自动应答 true 代表自动应答 false 手动应答,要通过编程实现回复验证,这就是Unacked 为返回ack的数据
* deliverCallback 消费方法,当消费者接收到消息要执行的方法, 参数是一个函数式接口可以使用 lambda表达式~
* cancelCallback 消息被取消时的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
-
建议先启动
消费者
在启动发送者
可以看到,消费者启动之后,在等待 发生者 发送消息,发送者启动发送消息,
消费者控制台会立刻接收到消息!
-
MQ 发送者一般情况下都不会直接忘队列发消息
这种情况下MQ 都会有一个默认的交换机~
工作模式 Work Queues
工作模式 相当于 简单模式的 升级版!
- 多个消费者,对应一个发送者,发送者 产生的消息存在队列种,队列会以
复杂均衡形式
轮询的发送给多个消费者
一般应用于:发送方事务简单,接收方事务复杂…
美团外卖:用户下单——后台内部要联系商家 骑手 生产订单 处理...
抽取工作类:
因为上面示例我们知道,创建交换机|队列 需要Channel信道
交换机 队列是创建在信道里面的
- 而每次创建交换机的时候,都要创建一次
Connection
Channel
- 于是我们可以将它抽离出一个工具类
MQChannelUtil.Java
MQChannelUtil.Java
- com.wsm目录下创建一个
util包
专门用来存储工具类🛠
import com.rabbitmq.client.Channel; //导入MQ的包~
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/** RabbitMQ 连接配置类: **/
public class MQChannelUtil
//得到一个连接的 channel
public static Channel getChannel() throws Exception
//创建一个连接工厂, 设置连接: IP 端口 用户 密码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("42.192.149.71");
// factory.setPort("设置对应的端口,默认就是: 5672");
factory.setUsername("admin");
factory.setPassword("123456");
//创建连接对象 信道对象
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
发送者
Producer.Java
import com.rabbitmq.client.Channel;
import com.wsm.Util.MQChannelUtil;
import java.util.Scanner;
/** 消息生产者 **/
public class Producer
// 定义队列名称
private final static String QUEUE_NAME = "Word";
public static void main(String[] args) throws Exception
// 工具类创建一个信道
Channel channel = MQChannelUtil.getChannel();
// Java控制台测试法消息:
Scanner scanner = new Scanner(System.in);
//创建交换机
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 循环多次发布消息:
while (scanner.hasNext())
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
消费者1
Consumer1.Java
import com.rabbitmq.client.*;
import com.wsm.Util.MQChannelUtil;
/** 消息消费者 **/
public class Consumer1
// 定义队列名称
private final static String QUEUE_NAME = "Word";
public static void main(String[] args) throws Exception
// 工具类创建一个信道
Channel channel = MQChannelUtil.getChannel();
//收到消息后用来处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) ->
String message = new String(delivery.getBody());
System.out.println(message);
;
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) ->
System.out.println("消息消费被中断");
;
/** 消费者消费消息 - 接受消息: 注意参数两个回调函数~ */
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
消费者2
Consumer2.Java
和消费者1 一模一样换一个名字,两个消费者监听一个队列 进行数据处理....
结果测试:
消息被轮询消费
:
- 通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息
- MQ 发送消息,一般情况下只会被一个
消费者执行消费
消费者执行之后, 队列就会将消息删除,(ACK机制...
- 后面可以通过,交换机模式完成,一个消息被多个消费者消费…
消息确认接收机制 ACK
消息一旦被消费者接收,队列中的消息就会被删除
RabbitMQ怎么知道消息被接收了呢?
-
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败!
-
但是RabbitMQ无从得知,这样消息就丢失了!
因此,RabbitMQ有一个ACK机制
- 当消费者获取消息后,会向RabbitMQ发送回执ACK 告知消息已经被接收。不过这种回执ACK分两种情况:
- 自动ACK: 消息一旦被接收,消费者自动发送ACK
- 手动ACK: 消息接收后,不会发送ACK,需要手动调用
自动ACK
-
RabbitMQ
默认此种模式:
消息发送后立即被认为已经传送成功!
消费者 接收到消息,就向队列发送ack,队列立刻就删除消息
-
这种模式需要在高吞吐量和数据传输安全性方面做权衡 仅适用在消费者可以高效并以 某种速率能够处理这些消息的情况下使用
手动ACK
-
消息接收后,不会发送ACK,需要手动代码进行调用
待消费者 执行完毕之后,在通过代码向 队列发送ack,队列接收到ack 之后会将消息删除!
-
channel.basicAck(long deliveryTag,boolean multiple); 用于肯定确认
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
参数1 long类型,表示处理的消息标识,MQ没发送一个消息都一个对于该消息的唯一标识…
就像序列化 序列号一样,用于网络传输..
参数2 boolean类型,表示是否支持批量处理
-
channel.basicNack(deliveryTag, false, true); 用于否定确认, 消费者 消息执行过程中失败,或服务器挂机…
参数1
同上,消息的唯一标识
参数2
表示是否支持批量处理
参数3
requeue true则重新入队列 false丢弃或者进入死信队列
-
channel.basicReject(deliveryTag, true); 用于否定确认
参数1
同上
参数2
requeue true则重新入队列 false丢弃或者进入死信队列
与 Channel.basicNack 相比少一个参数,不可以进行批量处理…
Multiple 批量消息处理:
-
true 代表批量应答处理
比如,现在队列上存在
1 2 3 4
四个消息,都发送给了消费者,而消费者逐一处理,4
结束了.不管是否
ACK|NACK
都直接将,其它的1 2 3
都以相同的,方式进行 批量处理!好处:在MQ 服务,稳定的时候,支持大量的消息处理速度…
缺点,容易造成数据丢失💀...
-
flase
建议使用,不批量应答
就是, 一次只处理当前消息的
ACK|NACK
消息自动重新入队
消费者设置了手动ACK 之后....
如果消费者由于某些原因失去连接 其通道已关闭,连接已关闭或 TCP 连接丢失
导致消息未发送 ACK 确认
- 消费者监听 队列消息,消费者开始处理,但是处理过程中,消费者突然与MQ 连接断开
消费者服务挂了
- MQ 正常情况下会与 消费者建立连接,当消费者突然断开,一段时间没有返回,消息处理的 ack,MQ就会当作消费者出现故障. 将消息重新交给其它消费者处理!心跳机制♥
生产者
Producer.Java
import com.rabbitmq.clientRabbitMq高级特性之延迟队列 通俗易懂 超详细 内含案例