RabbitMQ-高级

Posted 啵萝蜜多斩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-高级相关的知识,希望对你有一定的参考价值。

消息队列 MQ 使用时 存在的问题:

  1. 消息可靠性(如何确保发送的消息至少被消费一次)

  1. 延迟消息(如何实现消息的延迟投递)

  1. 高可用问题(如何避免单点的MQ故障而导致的不可用问题)

  1. 消息堆积问题(如何解决百万消息堆积,无法及时消费的问题)

消息可靠性的原因及解决方案

原因:

  • 消息发送时丢失

  1. 生产者发送的消息未送达交换机

  1. 交换机的消息未送达队列

  • MQ 服务宕机,队列中的消息丢失

  • 消费者接收到消息后还没来得及消费,MQ就宕机了

解决方案:

  1. 生产者确认机制 (publisher confirm)

publisher confirm 机制必须给每个消息这是一个全局唯一ID,消息发送到MQ之后,会返回一个结果给publisher,表示是否成功

返回结果的两种方式:

1, publisher-confirm,发送者确认

消息成功发送给交换机,返回 ack

消息未发送到交换机,返回 nack

2, publisher-return 发送者回执

消息发送到交换机了,但是没有路由给队列,返回 ack,以及路由失败的原因

项目中,配置 发送者 的application.yml
spring:
  rabbitmq:
    publisher-confirm-type: correlated  
    publisher-returns: true
    template:
      mandatory: true

配置说明:

publisher-confirm-type: 开启 publisher-confirm,有两种类型

①,simple :同步等待 confirm 结果,直到超时

②,correlated :异步回调,定义ConfirmCallback ,MQ 返回结果是,会回调这个 ConfirmCallback

publish-returns:开启 publish-return ,基于回调机制,定义ReturnCallback

template.mandatory:定义消息路由失败的策略,

①,true:调用 ReturnCallback

②,false:直接丢弃消息

  1. MQ 持久化

消息发送到队列后,如果MQ突然宕机,也可能导致消息丢失,所以为确保消息安全保存,需开启持久化机制,(三种持久化机制都开启之后,才能保证消息的完整安全性

  1. 交换机持久化

RebbitMQ 中的交换机默认不持久,重启之后就会丢失

SpringAMQP 中声明的交换机 默认是 持久化的

  1. 队列持久化

RebbitMQ 中的队列默认不持久,重启之后就会丢失

SpringAMQP 中声明的队列 默认是 持久化的

  1. 消息持久化

SpringAMQP 发送的任何消息 默认是 持久化的

  1. 消费者确认机制

RebbitMQ 消息是阅后即焚机制,MQ 确认消息被消费者消费后会立刻删除,所以MQ也可以通过获取消费者返回的 ACK 来确认消费者是否已经处理消息

SpringAMQP 提供了三种确认模式

  1. manual : 手动ACK,根据业务情况,调用 API 发送 ACK

  1. auto : 自动 ACK ,由Spring 检测 istener 代码是否出现异常,无异常返回ACK ,有异常则返回 NACK

  1. none : 关闭ACK ,MQ 认为消息每次都成功 处理,因此消息投递之后,立即被删除

一般情况下,使用 auto 就可以了

在 消费者 服务 的application.yml 中配置

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none / manual / auto  # 模式
  1. 失败重试机制

问题 : 当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力

解决:

  1. 本地调试(基于Spring Retry 机制),

在消费者服务 的 application.yml配置

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数,达到最大次数后,返回ACK,丢弃消息
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重试失败策略:

问题:失败重试机制中,如果达到最大重试次数后,依然失败,消息就会被丢弃,无法真正保证消息可靠性,

解决: RepublishMessageRecoverer,失败的消息投递到一个指定的队列,由人工处理

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate)
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");

可靠性总结:

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列

  • 开启持久化功能,确保消息未消费前在队列中不会丢失

  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack

  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理


死信交换机

什么是死信?

当一个队列中的信息满足下列情况之一时,就可以称为死信(dead letter)

  • 消费者使用basic.reject 或 basic.nack声明消费失败,并且消息的requeue 参数设置为false

  • 消息是一个过期消息,超时无人消费

  • 要投递的消息队列消息满了,无法投递

如果这个半酣私信的队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称之为死信交换机(dead letter exchange 简称DLX)

在失败重试策略中,默认的 RejectAndDontRequeueRecoverer 会在本地重试次数耗尽后,发送 reject 给RebbitMQ 消息变成死信, 可以给消息队列添加一个死信交换机,并绑定一个队列,用来存放死信,这样可以防止死信被丢弃

投递死信到死信交换机时,必须声明 死信交换机的名称,死信交换机与死信队列绑定的RoutingKey

死信总结

什么样的消息会成为死信?

  • 消息被消费者reject或者返回nack

  • 消息超时未消费

  • 队列满了

死信交换机的使用场景是什么?

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;

  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

TTL (消息超时时间)

超时未消费的消息会成为死信,超时的两种情况:

  1. 消息所在的队列设置了超时时间

  1. 消息本身设置了超时时间

消息超时的两种方式:

  1. 给队列设置 ttl 属性,进入队列后超过ttl 时间的消息变成死信

  1. 给消息设置ttl 属性,队列接收到消息超过ttl 事件后变成死信

实现发送一个消费者20秒后才能收到的消息的流程:

  • 给消息的目标队列指定死信交换机

  • 将消费者监听的队列绑定到死信交换机

  • 发送消息时,给消息设置超时时间为20 秒

延迟队列 ( Delay Queue)

对消息设置 ttl 属性,并结合死信交换机,实现了消息延迟发送,这种消息模式称为延迟队列模式

常见的场景:

  1. 延迟发送短信,

  1. 用户下单,如果用户在指定时间内欸支付,则自动取消

  1. 预约工作会议,指定时间后自动通知所有参会人员

DelayExchange (延迟消息插件 )

DelayExchange 需要将一个交换机 声明为 delayed 类型,当我们发送消息到delayExchange 时,

流程:

  • 接收消息

  • 判断消息是否具有 x-delay 属性

  • 如果有 x-delay 属性,说明是延迟消息,持久化到硬盘,读取 x-delay 值,作为延迟时间

  • 返回 routing not found 结果给消息发送者

  • x-delay 时间到期后,重新投递消息到指定队列

延迟队列的使用步骤:

  1. 声明一个交换机,添加 delayed 属性为true

  1. 发送消息时,添加 x-delay 头,值为 超时时间

惰性队列(lazy queue)

消息堆积:由于 消息的投递 > 消息的处理 ,导致队列存储达到上限,之后发送的消息就会成为死信,可能会被丢弃.

解决方案:

  1. 增加更多的消费者,提高消费速度,也就是 工作队列( work queue)

  1. 扩大队列的存储上限,提高消息的存储量

提高队列的消息存储量,必然会对内存造成压力,所以开始有了 惰性队列

惰性队列 从 RabbitMQ 3.6 版本开始加入,特征如下:

  1. 接收到消息后直接存入磁盘,而不是内存

  1. 消费者要消费消息时,才会从磁盘中读取并加载到内存中,

  1. 支持数百万条的消息存储

声明方式:

  • 通过命令行的方式,将正在运行中的队列修改为惰性队列

  • 基于@Bean 声明队列时,指定属性 lazy

惰性队列-总结

消息堆积的解决方式:

  1. 队列上绑定多个消费者,提高消费速度

  1. 使用惰性队列,可以在队列中保存更多的消息

惰性队列的优点:

  1. 基于磁盘存储,消息上限高

  1. 没有间歇性的 page-out ,性能比较稳定

缺点:

  1. 基于磁盘存储,消息的时效性会降低

  1. 性能受限于磁盘的IO

MQ 集群

基于 MQ的语种是面向并发的语言 Erlang ,其本身就支持集群模式

模式有:

  1. 普通集群

  1. 镜像集群

普通集群

这是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力

特征:

  • 会在集群的各个节点之间共享部分数据,包括交换机,队列辕信息,不包含队列中的消息

  • 当访问集群某节点时,如果队列不再该节点,会从数据所在节点传递到当前节点并返回

  • 队列所在的节点宕机,队列中的消息就会丢失

镜像集群

基于主从模式

特征:

  • 交换机,队列,队列中的消息会在各个MQ的镜像节点之间同步备份

  • 创建队列的节点被称为该队列的主节点,备份到的其他节点叫做该系列的镜像节点

  • 一个队列的主节点可能是另一个队列的镜像节点

  • 所有操作都是主节点完成,然后同步给镜像节点

  • 主机宕机后,镜像节点hi代替成心的主节点

仲裁队列

3.8版本之后加入,用以代替镜像队列,

特征:

  1. 与镜像队列一样,基于主从模式,支持主从数据同步

  1. 使用简单,没有复杂的配置

  1. 主从同步基于 Raft 协议,强一致

消息幂等性

幂等性:同一个消息被消费一次 或 多次,结果一样

消息可能会被重复消费,所以我们要解决消息幂等性的问题

消息重复消费是怎样产生的:

1,发送端MQ-client将消息发送给服务端MQ-server

2,服务端MQ-server 将消息存入内存

3,服务端MQ-server返回ACK给发送端MQ-client

4,服务端MQ-server将消息发给接收端 MQ-client

5,接收端MQ-client返回ACK给服务端

6,服务端MQ-server将消费的消息删除

如果第3步,和第5步,出现了问题,导致接收者(MQ-server)没有收到ACK

那么第3步的接收者和第5步的接收者会重新发送(retry)同样的消息,导致重复消费

保证幂等性,要保证同样的消息消费一次或多次的效果一样

解决方案:

消息消费者根据消息的种类进行不同的处理

1,查询操作,天然幂等性,不需要做特殊处理

2,根据id 删除,天然幂等,不需要做特殊处理

3,

  1. 根据 id 做等值(name=张三)修改,天然幂等性,不需要特殊处理

  1. 如果时做增加或者减少的修改(库存+1,金额-100),不幂等,使用乐观锁,

发消息时携带一个版本号 version 要跟数据库里面的 version 字段值一致

做修改时根据id 和version作为条件进行修改,并且将 version +1

比如一个 id=1 version=1 的商品,需要增加2的库存,发消给商品服务,

商品服务接收到消息后,生成的sql:update goods set total=total +2,version=version+1 where version =1and id=1

4,新增

在消息生产者就生成数据的ID,然后再消费者新增之前先根据 id 去数据库中查询,如果没有则新增,如果有,则说明该消息是一个重复消息

用分布式锁:

发消息时将消息的id 存入redis

消费消息时,先去redis 查询有没有对应的 id ,如果没有则不消费,消费成功后删除 redis 中

合格消息的id

消息顺序消费

什么是消息顺序消费?

如果有一个业务场景一共有3个消息,需要按照顺序依次消费1,2,3号消息

1.消息生产者必须保证将有顺序的消息发到同一个队列

2.消息消费者必须是一个线程来消费消息

3.设置消费者每次取一个消息xiao

以上是关于RabbitMQ-高级的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 参数们的Power “续”

RabbitMQ——高级特性(SpringBoot实现)

RabbitMQ学习(下)——发布确认高级幂等性优先级惰性和RabbitMQ集群

RabbitMQ RabbitMQ高级整合应用

RabbitMQ RabbitMQ高级整合应用

RabbitMQ高级进阶(2021.05.29)