32 Consumer消息零丢失方案:手动提交offset + 自动故障转移

Posted 鮀城小帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了32 Consumer消息零丢失方案:手动提交offset + 自动故障转移相关的知识,希望对你有一定的参考价值。

1. 消费者(红包系统)丢失消息的问题

前面两章中,阐述了如何确保订单系统发送出去的消息一定会到达MQ中,而且也能确保了如果消息到达了MQ如何确保一定不会丢失。

在整个消息的生产消费中,就剩下消费者这一端的问题了。

红包系统(消费者)拿到消息后,一定可以成功的派发红包吗?

如果红包系统已经拿到了这条消息,但是消息目前还在它的内存里,还没执行派发红包的逻辑,此时他就直接提交了该消息的offset到broker去说自己已经处理过了。

 在以上的场景中,一旦红包系统在执行派发红包逻辑之前就崩溃了,内存里的消息就没了,红包也没派发出去,结果Broker已经收到它提交的消息offset了,还以为它已经处理完这条消息了。

等红包系统(消费者)重启的时候,就不会再次消费这条消息了。

这里说明了,消费者在获取到消息之后还是可能会丢失消息的。

2.Kafka消费者的数据丢失问题

RocketMQ的各种技术思想,在Kafka等中间件中也是适用的。

在Kafka中,由于Kafka的消费者采用的消费的方式跟RocketMQ不同,如果按照Kafka的消费模式,就会产生数据丢失的风险。

Kafka消费者在拿到一批消息,还没来得及处理时,就提交offset到broker去了,一旦在业务逻辑执行前消费者系统就挂掉了,这批消息就再也没机会处理了,因为它重启后不会再次获取提交过offset的消息。

3.RocketMQ消费者的不同之处

RocketMQ的消费者和Kafka的消费者有较大不同。

下列是RocketMQ消费者的代码:

重点看这里的小块内容:

 RocketMQ的消费者中会注册一个监听器,也就是 MessageListenerConcurrently 这个东西,当你的消费者获取到一批消息之后,就会回调这个监听器函数,让你来处理这一批消息。

然后当你处理完毕之后,才会返 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 作为消费成功的示意,告诉RocketMQ,这批消息我已经处理完毕了。

解决消费者丢失消息的思路是,只要你的红包系统(消费者)是在这个监听器的函数中先处理一批消息,基于这批消息都派发完了红包,然后返回了消费成功的状态,接着才会去提交这批消息的offset到broker去。

在这种情况下,当你对一批消息都处理完毕了,然后再提交消息的offset给broker,接着红包系统(消费者)崩溃了,此时是不会丢失消息的。

 在这种机制下,如果消费者获取到一批消息之后,还没处理完,也就是没返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS 这个状态,同时也就是说没提交这批消息的offset给broker的时候,消费者突然挂了。

在上面说的那种情况下,消费者对这一批消息都没提交它的offset给broker的话,broker就不会认为你已经处理完了这批消息,此时消费者的一台机器宕机了,broker会感知到你的红包系统(消费者)机器作为一个Consumer挂了。

就会把你没处理完的那批消息叫个红包系统的其他机器去进行处理,所以在这种情况下,消息是绝对不会丢失的。

4. 关键点:不能异步消费消息

 只有在同步模式下,才能是必须处理完一批消息了,才会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态表示消息表示处理结束了,去提交offset到broker去。

而在异步模式下,如果在代码中对消息进行异步的处理,比如开启了一个子线程去处理该批消息,然后启动线程之后,就直接返回了ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态了。而此时子线程可能还没处理完业务,却已经返回标识状态并提交了offset。此时一旦红包系统(消费者)宕机了,MQ会认为已经消费完成,而红包系统由于未执行完成派发红包业务,就会导致消息的丢失。

以上是关于32 Consumer消息零丢失方案:手动提交offset + 自动故障转移的主要内容,如果未能解决你的问题,请参考以下文章

31 Broker消息零丢失方案:同步刷盘 + Raft协议主从同步

kafka 提交offset

33 基于RocketMQ设计的全链路消息零丢失方案总结

kafka consumer offset机制

29 除了使用事务消息方案,还有什么解决发送消息零丢失的方案

27 发送消息零丢失方案:RocketMQ事务消息的实现流程分析