RabbitMQ核心功能介绍
Posted weixin_44991304
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ核心功能介绍相关的知识,希望对你有一定的参考价值。
RabbitMQ核心功能
- 一.MQ的概念与功能介绍
- 二.RabbitMQ的介绍和入门案例
- 三.RabbitMQ的工作队列
- 四.RabbitMQ的工作模式
- 五.RabbitMQ的发布确认
- 六.RabbitMQ的死性队列
- 七.RabbitMQ的延迟队列
本文对RabbitMQ核心功能的介绍,没有介绍RabbitMQ的安装与集群,案列代码采用原生的Java代码和springboot两种形式,内容参考自RabbitMQ中文文档黑马程序员和尚硅谷RabbitMQ教程的笔记,还有一些自己的理解。
一.MQ的概念与功能介绍
1.MQ是什么?
🐘🐘 MQ(message queue),从字面意思上看就是消息·队列,满足队列的FIFO 先入先出特点,只不过队列中存放的内容是message,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
什么叫上下游传递消息:就好比老板给我发微信让我加班,老板就是上游,我就是下游,我和老板之间的通信就可以称为上下游传递消息。又好比两个分布式系统之间进行通信A系统(又称为生产者)给B系统(消费者)发送消息只需要依赖 MQ,不用依赖其他服务。
2.为什么要用MQ呢?
🐸🐸MQ优势1.应用解耦:因为系统的耦合性越高,容错性就越低,可维护性就越低,那么我们就要进行应用解耦。
假如有如下分布式系统用户购买完商品去订单系统下单,订单系统去操作库存系统修改订单,操作支付系统扣钱,提醒物流系统发货,如果其中有任何一个系统挂了那么跟着订单系统也挂了。再比如又添加了一个X系统那么又要去修改订单系统的代码操作X系统。
解决办法:使用MQ,万物皆可中间加一层,没有加一层解决不了的问题如果有那就再加一层。
此时如果用户再去下订单,订单系统只需要发一条消息给MQ就可以,MQ再去操作其他系统,订单系统就可以发一条消息告诉客户下单成功,其他的事情都交给MQ解决。假如此时库存系统又挂了,MQ可以等到库存系统修复了再发消息给库存系统让他减库存即可。此时订单系统与其他的系统之间耦合度便降低了,如果再添加X系统那么也还是通过MQ发送消息给X系统,此时系统的可扩展性就提高了。
🐻🐻MQ优势2.异步提速:同样用上面用户下单的例子来说明问题。
用户购买商品通过订单系统下单,此时订单系统要去操作库存系统支付系统物流系统然后再去操作自己的数据库,一系列操作完成后再去告诉用户下单成功。此时过去了920ms,十分影响用户体验。
加入MQ此时订单系统把订单消息发送给MQ然后就可以去操作数据库和返回消息给用户了。其他事情都交给MQ去做。用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
🐼🐼MQ优势三:流量削峰或者称为削峰填谷:A系统每秒最大处理1000个请求突然来了5000个请求,此时系统扛不住就要挂了。
解决方案:把请求发往MQ,然后A系统每秒从MQ中拉取1000个请求就行处理。使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。使用MQ后,可以提高系统稳定性。
MQ的劣势
💀💀系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。需要考虑如何保证MQ的高可用?
💀💀 系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。存在一下问题:
如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?
💀💀一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
3.MQ有优势有劣势,那MQ什么时候可以使用?
- 生产者不需要从消费者处获得反馈。
- 容许短暂的不一致性。
- 确实是用了有效果。能够有解耦、提速、削峰这些方面的收益,并且带来的收益超过加入MQ后,管理MQ的成本。
4. 几种常见的MQ
MQ的选择
-
Kafka:
主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。 -
RocketMQ:
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。 -
RabbitMQ
结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。
二.RabbitMQ的介绍和入门案例
1.RabbitMQ的介绍
RabbitMQ 是Rabbit 技术公司基于 AMQP 标准开发的,采用 Erlang 语言开发,是一个消息中间件它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
2. AMQP是什么
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信,模型图如下。
消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
3.RabbitMQ的工作原理图和相关概念解释
因为RabbitMQ是基于 AMQP 标准开发的所以其原理图与AMQP模型图相差无几。
🐴🐴1.Producer: 产生数据发送消息的程序是生产者
🐑🐑2.Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
🐍🐍3.Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
🐥🐥4.Connection:publisher/consumer 和 broker 之间的 TCP 连接
🐧🐧5.Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP ,Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销
🐝🐝6.Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个由交换机类型决定
🐌🐌7.Queue:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
🐀🐀8.Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
🐁🐁8.Consumer:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。
注意:生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
4.RabbitMQ入门程序
🐳🐳1.普通jiava程序,引入相关依赖
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
生产者
public class Producer
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.142.89.135");
factory.setUsername("guest");
factory.setPassword("guest");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
try(Connection connection = factory.newConnection();Channel channel =
connection.createChannel())
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello world";
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
消费者
public class Consumer
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.142.89.135");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息....");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag, delivery)->
String message= new String(delivery.getBody());
System.out.println(message);
;
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->
System.out.println("消息消费被中断");
;
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
🐬🐬2.SpringBoot入门程序
生产者消费者都引入一下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生产者
@Configuration
public class HelloWorldConfig
//交换机名称
public static final String DIRECT_EXCHANGE_NAME="direct_exchange";
//队列名称
public static final String QUEUE_NAME="hello_world";
//声明交换机
@Bean("helloExchange")
public DirectExchange helloExchange()
return new DirectExchange(DIRECT_EXCHANGE_NAME);
//声明队列
@Bean("consoleQueue")
public Queue consoleQueue()
return QueueBuilder.durable(QUEUE_NAME).build();
//绑定关系
@Bean
public Binding helloBinding(@Qualifier("helloExchange") DirectExchange exchange,@Qualifier("consoleQueue") Queue queue)
return BindingBuilder.bind(queue).to(exchange).with("hello");
@SpringBootTest
public class HelloWorldTest
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void TestSendMsg()
rabbitTemplate.convertAndSend(HelloWorldConfig.DIRECT_EXCHANGE_NAME,"hello","hello world");
消费者
@Component
public class HelloConsumer
@RabbitListener(queues = "hello_world")
public void ListenerQueue(Message message)
String msg = new String(message.getBody());
System.out.println(msg);
启动消费者的主程序控制台下输出:
三.RabbitMQ的工作队列
1.工作队列的概念
WorkQueues工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
案例分析:我们启动两个消息消费者线程,一个消息生产者线程,我们来看看两个工作线程是如何工作的。
两个工作线程
public class Worker01
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:"+receivedMessage);
;
CancelCallback cancelCallback=(consumerTag)->
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
;
System.out.println("C2 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
public class Worker02
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:"+receivedMessage);
;
CancelCallback cancelCallback=(consumerTag)->
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
;
System.out.println("C1 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
生产者
public class Task01
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception
try(Channel channel=RabbitMqUtils.getChannel();)
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台当中接受信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext())
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息,是通过轮询的方式发送消息。
不公平分发机制
🐆🐆RabbitMQ默认情况下是通过轮询的方式发送消息。但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。
为了避免这种情况,我们可以设置参数 channel.basicQos(1);
1为不公平分发,默认为0是公平分发。
消息预取值
🐩🐩本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。
这个时候就可以通过使用 如下方法设置“预取计数”值来完成的
void basicQos(int prefetchCount);
该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。
例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何
消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。
🐪🐪消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。
预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。
2.消息应答机制
🐠🐠 问题分析 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。
🐋🐋 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
3.消息自动应答模式
🐏🐏消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
使用方式只需要在发送消息时将 autoAck参数设置为true即可。
String basicConsume(String queue,
boolean autoAck, DeliverCallback deliverCallback,
CancelCallback cancelCallback)
4. 消息手动应答模式 可以批量应答并且减少网络拥堵
在发送消息时将 autoAck参数设置为false,并且需要指定应答策略
三种应答策略
//用于肯定确认 RabbitMQ 已知道该消息并且成功的处理消息,
//可以将其丢弃了 第一个参数代表的是消息的标记,
//第二个参数是代表是否批量应答
1. void basicAck(long deliveryTag, boolean multiple);
//用于否定确认 参数1:消息的标记
//参数2:是否批量应答
//参数3:是否重新入队
2.void basicNack(long deliveryTag, boolean multiple, boolean requeue)
//用于否定确认不处理该消息了直接拒绝,可以将其丢弃
// 参数含义同上
3.void basicReject(long deliveryTag, boolean requeue)
Multiple 的解释
multiple 的 true 和 false 代表不同意思
🐐🐐true 代表批量应答 channel 上未应答的消息
🐖🐖false 同上面相比:只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
手动应答案例演示:
消费者:
public class TaskConsumerOne
public static final String TASK_QUEUE_NAME="task";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqChannel.getChannel();
//是否自动应答
boolean autoAck=false;
//是否公平分发
//channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME,autoAck,(consumerTag以上是关于RabbitMQ核心功能介绍的主要内容,如果未能解决你的问题,请参考以下文章