消息队列:RabbitMQ
Posted code果冻爽
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列:RabbitMQ相关的知识,希望对你有一定的参考价值。
消息队列 Message Queue
一、 消息中间件概述
1.大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力
2.消息服务中两个重要概念:
- 消息代理(message broker)和目的地(destination)
- 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
3.消息队列主要有两种形式的目的地
- 队列(queue):点对点消息通信(point-to-point)
- 主题(topic):发布(publish)/订阅(subscribe)消息通信
4.点对点式
- 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获 取消息内容,消息读取后被移出队列
- 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
5.发布订阅式:
- 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个 主题,那么就会在消息到达时同时收到消息
6.JMS(Java Message Service)JAVA消息服务:
- 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
7.AMQP(Advanced Message Queuing Protocol)
-
高级消息队列协议,也是一个消息代理的规范,兼容JMS
-
RabbitMQ是AMQP的实现
8.Spring支持
-
spring-jms提供了对JMS的支持
-
spring-rabbit提供了对AMQP的支持
-
需要ConnectionFactory的实现来连接消息代理
-
提供JmsTemplate、RabbitTemplate来发送消息
-
@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
-
@EnableJms、@EnableRabbit开启支持
9.Spring Boot自动配置
-
JmsAutoConfiguration
-
RabbitAutoConfiguration
10.市面上的MQ产品
•ActiveMQ、RabbitMQ、RocketMQ、Kafka
提到消息中间件就要想到:异步、消峰、解耦
消息队列主要分为两大类:一类是JMS(Java Message Service)JAVA消息服务
,另一类是:AMQP(Advanced Message Queuing Protocol)
二、 RabbitMQ
2.1 RabbitMQ简介
RabbitMQ是一个由erlang
开发的AMQP(Advanved Message Queue Protocol)
的开源实现。
2.2 核心概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可 能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型:direct(默认)
,fanout
, topic
, 和headers
,不同类型的Exchange转发消息的策略有所区别
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直 在队列里面,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交 换器理解成一个由绑定构成的路由表。
Exchange 和Queue的绑定可以是多对多的关系。
Connection
网络连接,比如一个TCP连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道 发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都 是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost
本质上就是一个 mini 版的 RabbitMQ 服务器,拥 有自己的队列、交换器、绑定和权限机制。vhost
是 AMQP 概念
的基础,必须在连接时 指定,RabbitMQ
默认的 vhost 是 / 。
Broker
表示消息队列服务器实体。
2.3 docker安装rabbitmq
安装命令:
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
对应端口号解释:
- 4369, 25672 (Erlang发现&集群端口)
- 5672, 5671 (AMQP端口)
- 15672 (web管理后台端口)
- 61613, 61614 (STOMP协议端口)
- 1883, 8883 (MQTT协议端口)
可访问 ip地址 : 15672 访问控制页面
2.4 RabbitMQ运行机制
AMQP中的消息路由
AMQP 中消息的路由过程和 Java 开 发者熟悉的 JMS 存在一些差别, AMQP 中增加了 Exchange 和 Binding 的角色。
生产者把消息发布 到 Exchange 上,消息最终到达队列 并被消费者接收,而 Binding 决定交 换器的消息应该发送到那个队列。
Exchange类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、topic、headers 。
headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接 看另外三种类型。
Direct Exchange
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器 就将消息发到对应的队列中。路由键与队 列名完全匹配,如果一个队列绑定到交换 机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发 “dog.puppy”,也不会转发“dog.guard” 等等。它是完全匹配、单播的模式
Fanout Exchange
每个发到 fanout 类型交换器的消息都 会分到所有绑定的队列上去。
fanout 交换器不处理路由键,只是简单的将队列 绑定到交换器上,每个发送到交换器的 消息都会被转发到与该交换器绑定的所 有队列上。
很像子网广播,每台子网内 的主机都获得了一份复制的消息。
fanout 类型转发消息是最快的。
Topic Exchange
topic 交换器通过模式匹配分配消息的 路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。
#匹配0个或多个单词,* 匹配一个单词。
三、 RabbitMQ整合SpringBoot
向pom.xml中引入springboot-starter:
<!-- 引入RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
观察RabbitAutoConfiguration
类可以看出,该配置类向容器中注入了几个重要的Bean对象:CachingConnectionFactory
、RabbitTemplate
、AmqpAdmin
(1) CachingConnectionFactory
RabbitTemplate
使用CachingConnectionFactory
作为连接工厂
配置类上标有这样的注解:@EnableConfigurationProperties(RabbitProperties.class)
向容器中注入CachingConnectionFactory
的代码中是从配置文件中加载配置信息的。
spring.rabbitmq
为配置的前缀,可以指定一些端口号,ip地址等信息。
#配置域名和端口号
spring.rabbitmq.host=192.168.190.131
spring.rabbitmq.port=5672
#配置虚拟地址
spring.rabbitmq.virtual-host=/
(2) AmqpAdmin
AmqpAdmin
是org.springframework.amqp.core
下的类,通过此类,可以用代码的方式创建Exchange、Queue还有Binding。
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createBinding()
// String destination 目的地
// DestinationType destinationType 绑定类型:队列/交换机
// String exchange 交换机名称
// String routingKey 路由键
//、Map<String, Object> arguments 参数
Binding binding = new Binding("hello.queue" , Binding.DestinationType.QUEUE, "hello", "hello.queue",null);
amqpAdmin.declareBinding(binding);
@Test
public void createMQ()
/**
* @param name 队列的名称
* @param durable 是否持久化队列
* @param exclusive 是否声明为一个独占队列
* @param autoDelete 如果服务不在使用时是否自动删除队列
*/
Queue queue = new Queue("hello.queue", true, false, false);
String s = amqpAdmin.declareQueue(queue);
log.info("创建queue成功... ", queue);
@Test
public void createExchange()
// String name 交换机名称
// boolean durable 是否持久化
// boolean autoDelete 是否自动删除
Exchange exchange = new DirectExchange("hello", true, false);
amqpAdmin.declareExchange(exchange);
log.info("创建exchange成功...");
(2) RabbitTemplate
通过RabbitTemplate类中的方法,可以像使用Rabbit客户端一样向队列发送消息以及更多其他的操作,并且多个重载的”send“(发送消息)方法。
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void test()
// 发送消息
rabbitTemplate.convertAndSend("hello", "hello.queue" ,"msg");
发送的消息不仅可以是一个序列化的对象,还可以是Json格式的文本数据。
通过指定不同的MessageConverter
来实现,可以向容器中注入我们想要的MessageConverter
从而使用。
@Configuration
public class MyRabbitConfig
@Bean
public MessageConverter messageConverter()
return new Jackson2JsonMessageConverter();
(3) @RabbitListener和@RabbitHandler注解
@RabbitListener
注解和@RabbitHandler
都可以接受消息队列中的消息,并进行处理。
@RabbitListener
注解:
可以标记方法或类上进行使用
自定义方法的参数可以为以下类型:
1、Message message:原生消息详细信息。头 + 体
2、T <发送的消息的类型> 可以是我们自定义的对象
3、Channel channel :当前传输数据的信道。
@RabbitListener(queues = "hello.queue")
public String receiveMessage(Message message, OrderEntity content)
//消息体信息
byte[] body = message.getBody();
// 消息头信息
MessageProperties messageProperties = message.getMessageProperties();
log.info("收到的消息: ", content);
return "ok";
同时要注意:Queue
可以由很多方法来监听,只要收到消息,队列就删除消息,并且只能有一个方法收到消息。并且一个方法接收消息是一个线性的操作,只有处理完一个消息之后才能接收下条消息。
@RabbitHandler
注解:
@RabbitHandler标在方法上。
@RabbitHandler标记的方法结合@RabbitListener,@RabbitHandler使用可以变得更加灵活。
比如说,当两个方法对一个消息队列进行监听时,用于监听的两个方法用于接收消息内容的参数不同,根据消息的内容可以自动的确定使用那个方法。
@Slf4j
@Controller
@RabbitListener(queues = "hello.queue")
public class RabbitController
@RabbitHandler
public String receiveMessage(Message message, OrderReturnReasonEntity content)
//消息体信息
byte[] body = message.getBody();
// 消息头信息
MessageProperties messageProperties = message.getMessageProperties();
log.info("收到的消息: ", content);
return "ok";
@RabbitHandler
public String receiveMessage2(Message message, OrderEntity content)
//消息体信息
byte[] body = message.getBody();
// 消息头信息
MessageProperties messageProperties = message.getMessageProperties();
log.info("收到的消息: ", content);
return "ok";
四、 RabbitMQ消息确认机制
概念:
-
保证消息不丢失,可靠抵达,可以使用事务消息,但是性能会下降250倍,为此引入确认机制
-
publisher confirmCallback 确认模式
-
publisher returnCallback 未投递到 queue 退回模式
-
consumer ack机制
4.1 消息确认机制-可靠抵达(发送端)
① ConfirmCallback
ConfirmCallback
和RetruhnCallback
一样都是RabbitTemplate
内部的接口。
消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。
也就是说当消息到达RabbitMQ的服务器就会执行回调方法。
首先需要修改配置文件:
spring.rabbitmq.publisher-confirms=true
然后准备一个发送消息使用的接口和两个用来监听消息队列并接收消息的方法
发送消息接口:
@RestController
public class SendMsgController
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg")
public String sendMsg()
for (int i = 0; i < 10; i++)
if (i % 2 == 0)
OrderEntity orderEntity = new OrderEntity();
orderEntity.setId(1L);
orderEntity.setMemberUsername("Tom");
orderEntity.setReceiveTime(new Date());
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.news", orderEntity, new CorrelationData(UUID.randomUUID().toString()));
else
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setCreateTime(new Date());
orderReturnReasonEntity.setId(2L);
orderReturnReasonEntity.setName("test");
orderReturnReasonEntity.setSort(1);
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.news", orderReturnReasonEntity, new CorrelationData(UUID.randomUUID().toString()));
return "ok";
监听消息队列并接收消息的方法:
@RabbitListener(queues = "hello.news")
@Slf4j
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService
@RabbitHandler
public void receiveMessage1(Message message, OrderReturnReasonEntity content, Channel channel)
//消息体信息
byte[] body = message.getBody();
// 消息头信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("receiveMessage1 接收消息: " + content);
@RabbitHandler
public void receiveMessage2(Message message, OrderEntity content, Channel channel)
//消息体信息
byte[] body = message.getBody();
// 消息头信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("receiveMessage2 接收消息: " + content);
第三步,在配置类中定制RedisTemplate:
@Configuration
public class MyRabbitConfig
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct // 该注解表示在初始化构造器之后就调用,初始化定制 RabbitTemplate
public void initRabbitTemplate()
// 设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
/**
*
* @param correlationData 当前消息的唯一相关数据 (这个是消息的唯一id)
* @param ack 消息是否成功收到
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
System.out.println("ConfirmCallback... correlationData: [" + correlationData + "] ==> ack: [" + ack + "] ==> cause: [" + cause + "]");
);
然后访问localhost:9000/sendMsg,就会发送消息,观察结果:
用于接收消息的两个方法都接收到了消息,并且自定义的ConfirmCallback
回调方法会打印相关信息。
② ReturnCallback
被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback
。
如果在交换机将消息投递到queue的过程中,发生了某些问题,最终导致消息投递失败,就会触发这个方法。
为定制的RabbitTemplate
添加这个方法:
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback()
/**
* @param message 投递失败的消息的详细信息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 但是这个消息发给哪个交换机
* @param routingKey 当时这个消息使用哪个路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
System.out.println("FailMessage: [" + message + "] ==> replyCode: [" + replyText + "] ==> exchange: [" + exchange + "] ==> routingKey: [" + routingKey + "]");
);
我们在发送消息的一端故意写错路由键,致使exchange投递消息失败。最后会看到回调方法ReturnCallback
中打印的内容:
FailMessage: [(Body:'"id":2,"name":"test","sort":1,"status":null,"createTime":1641608721639' MessageProperties [headers=spring_returned_message_correlation=b6b21f2d-73ad-473d-9639-feec76953c7b, __TypeId__=com.atguigu.gulimall.order.entity.OrderReturnReasonEntity, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])] ==> replyCode: [NO_ROUTE] ==> exchange: [hello-java-exchange] ==> routingKey: [hello.news1]
补充:在发送消息的时候还可以指定一个CorrelationData
类型的参数(可以回顾上文的发送消息的方法),这个CorrelationData
类的构造器参数可以填一个UUID,代表消息的唯一id,在重写ConfirmCallback
中的方法的第一个参数就是这个,通过这个参数就可以获取消息的唯一id。
注意:监听方法返回值必须为void,否则控制台会不断打印报错信息。(血的教训)
4.2 消息确认机制-可靠抵达(消费端)
ACK(Acknowledge)消息确认机制
消费者获取到消息,成功处理,可以回复Ack给Broker
- basic.ack用于肯定确认;broker将移除此消息
- basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
- basic.reject用于否定确认;同上,但不能批量
在默认状况下,ACK消息确认机制是当消息一旦抵达消费方法就会直接出队(删除),但是如果在消息消费过程中服务器宕机了,这些消息也会被删除,这就造成了消息丢失的问题。
通过配置可以开启消息需要经过手动确认,才能从队列中删除消息
#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
改写方法:
@RabbitHandler
public void receiveMessage2(Message message, OrderEntity content, Channel channel)
//消息体信息
byte[] body = message.getBody();
// 消息头信息
MessageProperties messageProperties = message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
//手动接收消息
//long deliveryTag相当当前消息派发的标签,从messageProperties中获取,并且在Channel中自增的
//boolean multiple 是否批量确认
try
channel.basicAck(deliveryTag, false);
catch (Exception e)
e.printStackTrace();
System.out.println("receiveMessage2 接收消息: " + content);
我们在上方的代码打上断点并观察RabbitMQ客户端的状况:
对中总共有5条消息,并且进入了Unacked,即未被确认的状态。
但是这里使用debug模式启动然后关掉服务模拟服务器宕机会发生一个问题,就是在关闭服务之前,idea会将未执行完的方法先执行完再关闭服务。
所以可以在cmd杀掉进程模拟宕机。
这时,由于打了断点,没有走到消息确认的那一行代码,随机,服务器宕机,所有没有确认的消息都会从Unacked的状态回调Ready的状态。
有接收消息的方法就有拒绝消息的方法:basicNack
和basicReject
//long deliveryTag 当前消息派发的标签
//boolean multiple 是否批量处理
//boolean requeue 拒绝后是否将消息重新入队
channel.basicNack(deliveryTag, false, true);
channel.basicReject(deliveryTag, true);
basicNack
和basicReject
都可以用来拒绝消息,但是basicNack
比basicReject
多了一个参数boolean multiple
(是否批量处理)
如果将requeue
设置为true,被拒绝的消息就会重新入队等待消费。
五、 RabbitMQ延时队列(实现定时任务)
场景:
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。
常用解决方案:
spring的 schedule 定时任务轮询数据库
缺点:
消耗系统内存、增加了数据库的压力、存在较大的时间误差
解决:rabbitmq的消息TTL和死信Exchange结合
(1) 消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。
RabbitMQ可以对队列和消息分别设置TTL。
对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的
TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration
字段或者x-message-ttl
属性来设置时间,两者是一样的效果。
(2) Dead Letter Exchanges(DLX)死信路由
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列, 一个路由可以对应很多队列。
什么是死信?
- 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不 会被再次放在队列里,被其他消费者使用。*(basic.reject/ basic.nack)*requeue=false
- 上面的消息的TTL到了,消息过期了。
- 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
**Dead Letter Exchange(死信路由)**其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有 消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息 被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。
手动ack&异常消息统一放在一个队列处理建议的两种方式
- catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
- 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
延时队列的实现:
方式一:设置一个有过期时间的消息队列
方式二:发送的消息赋予过期时间。
但是基于RabbitMQ对消息的惰性处理,通常选
以上是关于消息队列:RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章