RabbitMQ管理台Purge大量消息堆积队列风险

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ管理台Purge大量消息堆积队列风险相关的知识,希望对你有一定的参考价值。

参考技术A

RMQ线上集群( v3.6.12 )purge一个大量消息堆积(100W+)的队列时,有可能导致客户端报大量发送超时异常。

purge 操作实际发生的事情( rabbit_amqqueue_process .erl):

handle_call(purge, _From, State = #qbacking_queue = BQ,
backing_queue_state = BQS) ->
Count, BQS1 = BQ:purge(BQS) ,
...

BQ默认对应 rabbit_variable_queue 模块:

msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MCSState1) ->
rabbit_msg_store :remove(MsgIds, MCSState1)
end).

也就是purge操作,最终是由 rabbit_msg_store 进程来进行操作的。

rabbit_msg_store 进程,每个节点只有一个,因此容易成为瓶颈。

rabbitmq 进程间通信采用 credit_flow 机制,一般一条持久化消息发送到持久化队列的大致过程如下:

rabbit_reader → rabbit_channel → rabbit_amqqueue_process → rabbit_msg_store

当 rabbit_msg_store 进程忙于大量消息的purge操作时,不能及时处理其上游 rabbit_amqqueue_process 的消息,这会导致其上游 rabbit_amqqueue_process 很快耗光其 credit 值,从而造成 flow。同理,

当 rabbit_amqqueue_process 进程由于 flow 被 block 住,不能及时处理其上游 rabbit_channel 的消息,导致 rabbit_channel 很快耗光其 credit 值,也造成 flow。最终限流状态会一直追溯到最上层 connection。

此时客户端发送会卡顿(表现出等待confirm超时等异常)

因此, 对于线上有业务量的RMQ集群,如果有大量消息堆积的队列需要清理,最好不要直接purge,有可能对线上业务造成影响 。

最保险的清理方式,起 Consumers 消费(接收即丢弃)

线上业务集群节点 TcpExt.pruneCalled 指标报警,同时发现有一队列处于 flow 状态,进而判断持久化进程 rabbit_msg_store 出现瓶颈,通过 sar 工具发现以下异常:

最终定位到节点对应的宿主机底层IO有问题,及时进行规避。

Finding bottlenecks with RabbitMQ 3.3

Purge a large queue is slow

RabbitMQ常见疑难问题

一.消息队列消息堆积如何处理?

当消息产生的速度长时间远远大于消费的速度的时候,就会产生消息的堆积。

消息堆积的影响:
  • 1.新消息可能无法进入MQ。
  • 2.旧消息无法丢失。
  • 3.等待消费的时间过长,超出业务的容忍范围。
产生堆积可能的因素:
  • 1.生产者大量发布消息
  • 2.消费者消费失败,没有ack自动应答。
  • 3.消费者可能出现性能瓶颈。
  • 4.消费服务挂掉。
解决思路:
  • 1.排查消费者运行效率、性能问题。
  • 2.消费者多线程处理。
  • 3.增加更多的消费者。
处理过程:

新生产的消息转移到新的队列,增加新的服务器部署新的消费者来消费。
原来的消息队列的消息可以继续慢慢消费。

二、消息丢失怎么办?

消息流经过程 —>生产者—>队列—>消费者

1.生产者环节丢失消息

场景:生产者在投递消息过程中,突遇断网可能丢失消息。

解决方案:生产者在消息投递环节开启消息确认机制(事物或者comfirm),发送失败时候重试。

2.队列中丢失消息

场景:队列服务宕机、重启,消息丢失。

解决方案:投递消息的时候,交换机、队列、消息都设置为持久化durable=true

3.消费者中丢失消息

场景:消费者启用了自动应答,消费者服务挂掉。
解决方案:通道设置每次只处理一条消息,关闭消费者自动应答,消费完成手动应答。

三、有序消费

情形1:多条不幂等消息如消息1、消息2、消息3,采用worker模式投递,由于worker模式的消费者是竞争者关系,拿到的消息顺序是有问题的。

解决方案:将消息1、消息2、消息3分别投递进不同队列,每个队列对应一个消费者

情形2:消费者开启了多线程

解决方案:消息队列分配消息后,创建多个内存队列,每个线程消费者去消费内存队列里面的消息

四、消息重复消费

出现原因:消费者消息消息的时候,MQ没有收到消息的ack应答。

场景:
  • 1.消费者消费消息后没有ack。
  • 2.消费者在消费消息后,ack时网络异常。
解决步骤:
  1. 消费者消费后,记录通过缓存记录消息的消费标识,消息id如redis的setnx
  2. 如果消费成功且ack成功,则删除记录的消息标记。
  3. 如果ack失败,消息下次被消费消息时候,先去查询消息的消费标识,已经消费则直接ack,未消费则继续消费。
概念:

消息幂等性:消息无论怎么执行都是同样结果。
非幂等性案例:扣款、扣库存、添加数据。

以上是关于RabbitMQ管理台Purge大量消息堆积队列风险的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ常见疑难问题

物联网平台Rabbitmq实际应用rabbitMQ的使用场景?消息队列产生严重消息堆积怎么处理?

如何修改在rabbitmq queue中的消息

RabbitMQ:惰性队列

RabbitMQ入门教程——工作队列

MQ消息队列