如何解决 顺序消费重复消费
Posted 黑夜-SO
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何解决 顺序消费重复消费相关的知识,希望对你有一定的参考价值。
文章目录
前言
其实,这些问题不仅仅挂钩于 RocketMQ 、kafka,而是应该每个消息中间件都需要去解决的。
其实 Kafka 的架构基本和 RocketMQ 类似,只是它注册中心使用了 Zookeeper 、它的 分区 就相当于 RocketMQ 中的 队列 。还有一些小细节不同会在后面提到。
如何进行顺序消费
在上面的技术架构介绍中,我们已经知道了 RocketMQ 在主题上是无序的、它只有在队列层面才是保证有序 的。
这又扯到两个概念——普通顺序 和 严格顺序 。
所谓普通顺序是指 消费者通过 同一个消费队列收到的消息是有顺序的 ,不同消息队列收到的消息则可能是无顺序的。普通顺序消息在 Broker 重启情况下不会保证消息顺序性 (短暂时间) 。
所谓严格顺序是指 消费者收到的 所有消息 均是有顺序的。严格顺序消息 即使在异常情况下也会保证消息的顺序性 。
但是,严格顺序看起来虽好,实现它可会付出巨大的代价。如果你使用严格顺序模式,Broker 集群中只要有一台机器不可用,则整个集群都不可用。你还用啥?现在主要场景也就在 binlog 同步。
一般而言,我们的 MQ 都是能容忍短暂的乱序,所以推荐使用普通顺序模式。
那么,我们现在使用了 普通顺序模式 ,我们从上面学习知道了在 Producer 生产消息的时候会进行轮询(取决你的负载均衡策略)来向同一主题的不同消息队列发送消息。那么如果此时我有几个消息分别是同一个订单的创建、支付、发货,在轮询的策略下这 三个消息会被发送到不同队列 ,因为在不同的队列此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性了。
那么,怎么解决呢?
其实很简单,我们需要处理的仅仅是将同一语义下的消息放入同一个队列(比如这里是同一个订单),那我们就可以使用 Hash取模法 来保证同一个订单在同一个队列中就行了。
如何防止重复消费
这个问题其实就两个字—— 幂等 。在编程中一个幂等 操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。比如说,这个时候我们有一个订单的处理积分的系统,每当来一个消息的时候它就负责为创建这个订单的用户的积分加上相应的数值。可是有一次,消息队列发送给订单系统 FrancisQ 的订单信息,其要求是给 FrancisQ 的积分加上 500。但是积分系统在收到 FrancisQ 的订单信息处理完成之后返回给消息队列处理成功的信息的时候出现了网络波动(当然还有很多种情况,比如Broker意外重启等等),这条回应没有发送成功。
那么,消息队列没收到积分系统的回应会不会尝试重发这个消息?问题就来了,我再发这个消息,万一它又给 FrancisQ 的账户加上 500 积分怎么办呢?
所以我们需要给我们的消费者实现 幂等 ,也就是对同一个消息的处理结果,执行多少次都不变。
那么如何给业务实现幂等呢?这个还是需要结合具体的业务的。你可以使用 写入 Redis 来保证,因为 Redis 的 key 和 value 就是天然支持幂等的。当然还有使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条。
不过最主要的还是需要 根据特定场景使用特定的解决方案 ,你要知道你的消息消费是否是完全不可重复消费还是可以忍受重复消费的,然后再选择强校验和弱校验的方式。毕竟在 CS 领域还是很少有技术银弹的说法。
而在整个互联网领域,幂等不仅仅适用于消息队列的重复消费问题,这些实现幂等的方法,也同样适用于,在其他场景中来解决重复请求或者重复调用的问题 。比如将HTTP服务设计成幂等的,解决前端或者APP重复提交表单数据的问题 ,也可以将一个微服务设计成幂等的,解决 RPC 框架自动重试导致的 重复调用问题 。
面试官杠上重复消费、消息堆积、消息丢失、顺序消息?
参考技术A消息队列在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在消息队列的使用和原理方面对小伙伴们进行360 的刁难。
什么,这么多问题啊!别慌,现在就来找找解决方案。
一、 重复消费
现在消息队列一般都能保证at least once的,也就是消息至少一次投递。 在这种情况为什么会出现重复消费的问题呢?通常都是由于网络原因造成的 ,原因如下:通常消息被成功消费后消费者都会发送一个成功标志给MQ,MQ收到这个标志就表示消息已经成功消费了,就不会再发送给其他消费者了。但是如果因为网络这个标志没有送到MQ就丢失了,MQ就认为这个消息没有被成功消费,就会再次发送给其他消费者消费,就造成重复了。
这时我们看这个问题就变成了我们怎么保证消费端的幂等性。
幂等性 是指一个操作其执行任意多次所产生的影响均与一次执行的影响相同,大白话就是你同样的参数调用我这个接口,调用多少次结果都相同。
怎么保证消息队列消费的幂等性
其实还是得结合业务来思考,我在这里给出几个解决方案:
1. 分布式锁 。生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后就是这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
2.唯一键防重 。基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
3.先查后写。 要写数据库前,先根据主键查一下,如果这数据都有了,你就别插入了,update一下好了。
4. 关闭重试机制 。如果把重试机制关掉的话不显示,虽然解决了重复消费的问题,但是可能会造成丢失消息,不建议这么做。
不同的业务可以选择不同的方案,如果服务的并发量不高,可以考虑唯一键防重或者先查后写的方案;如果并发量较高,追求性能,沐子推荐采用分布式锁实现幂等性(本公司目前采用的方案)
二、 消息堆积
1. 消息堆积的产生原因
消息堆积的原因主要在于两方面,其一为消费的太慢或消费方出现异常,其二为生产方生产的太快,总的来说就是 消息的速度赶不上生产的速度,生产和消费速度不匹配造成的 。
2. 消息堆积的解决方案
1)生产端:一般当生产端发生积压(Broker正常的情况下)就要查看你的业务逻辑是否有异常的耗时步骤导致的,是否需要改并行化操作等。
Broker端:当Broker端发生积压我们首先要查看,消息队列内存使用情况, 如果有分区的的话还得看每个分区积压的消息数量差异。当每个分区的消息积压数据量相对均匀的话,我们大致可以认为是流量激增。需要在消费端做优化,或者同时需要增加Broker节点(相当于存储扩容),如果分区加压消息数量差异很大的话(有的队列满了,有的队列可能还是空闲状态),我们这时候就要检查我们的路由转发规则是否合理。
2) 增加消费者 ,多部署几台消费者机器(横向扩展),提升消费者的消费能力。
3)此种情况可以将这些 消费不成功的消息转发到其它队列里去(类似死信队列) ,后面再慢慢分析死信队列里的消息处理问题。
4) mq 中的消息过期失效了。可以采取一个方案,就是批量重导 ,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。
总之,上面说到消息积压的问题,我们需要查看是否有无限重发的消息或者有进入死锁的程序等等,当确定是流量激增的话,我们需要评估是否需要增加资源还是通过限流的方式解决,当短时间大量消息需要处理时,在资源允许的情况下,我们可以新启一批消费者与消息队列,将原来的消费者中的消息直接作为生产者转发到临时应急队列中,这样大概率的能够快速解决消息积压。与其事后处理不如我们在设计之初就要把积压考虑进来,对于数据量非常大,但是实时性要求不高的场景,可以设计出批量消息发送,当队列积累到一定阀值再做批量消费消费,这里需要注意的就是重复消费带来的影响,设计不好就是一场灾难。
三、 消息丢失
一般来讲消息丢失的途径有三个: 生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据 。
1. 生产者弄丢数据
a、丢失的原因:因为网络传输的不稳定性,当生产者在向MQ发送消息的过程中,MQ没有成功接收到消息,但是生产者却以为MQ成功接收到了消息,不会再次重复发送该消息,从而导致消息的丢失。
b、解决办法:有两个解决办法,第一个方法: 向broker发送消息时,如果由于网络抖动等原因导致消息发送失败,可以设置 失败重试次数让消息重发 。
第二个方法: 事务机制和confirm机制,最常用的是confirm机制 ;
事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 MQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。
2. MQ弄丢数据
a、丢失的原因:MQ接收到生产者发送过来的消息,是存在内存中的,如果没有被消费完,此时MQ宕机了,那么再次启动的时候,原来内存中的那些消息都丢失了。
b、解决办法: 开启MQ的持久化 。结合上面的说到的confirm机制,只有当消息成功持久化磁盘之后,才会回调生产者的接口返回ack消息,否则都算失败,生产者会重新发送。存入磁盘的消息不会丢失,就算MQ挂掉了,重启之后,他会读取磁盘中的消息,不会导致消息的丢失。
注意,哪怕是你给 MQ 开启了持久化机制,也有一种可能,就是这个消息写到了MQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时MQ挂了,就会导致内存里的一点点数据丢失。
所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,MQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。
3. 消费者弄丢数据
a、丢失的原因:如果MQ成功的把消息发送给了消费者,那么MQ的ack机制会自动的返回成功,表明发送消息成功,下次就不会发送这个消息。但如果就在此时,消费者还没处理完该消息,然后宕机了,那么这个消息就丢失了。
b、解决的办法:1)简单来说,就是必须关闭 MQ 的自动提交,把 自动提交改为手动提交 ,也就是说当我消费成功后才会进行提交。
2)消费者端已经正常接收到消息但是在执行后续消息处理时发生了异常,最终返回处理失败。 重试-进行重新消费问题,如果一直这样重复消费都持续失败到一定次数,可以投递到 DLQ 死信队列,应用可以监控死信队列来做人工干预 。
四、 顺序消费
比如一个电商的下单操作,下单后先减库存然后生成订单,这个操作就需要顺序执行的。队列本身是有顺序的,但是为什么还要保证顺序消费呢,主要是因为生产环境服务实例一般都是集群,当消费者是多个实例时,队列中的消息会分发到所有实例进行消费(同一个消息只能发给一个消费者实例),这样就不能保证消息顺序的消费,因为你不能确保哪台机器执行消费端业务代码的速度快。
保证每次只有单个消费实例消费
所以对于需要保证顺序消费的业务,我们可以只部署一个消费者实例,然后设置MQ 每次只推送一个消息,再开启手动 ack 即可。这样MQ 每次只会从队列推送一个消息过来,处理完成之后我们 ack 回应,再消费下一个,就能确保消息顺序性。
这样MQ 每次只会从队列推送一个消息过来,处理完成之后我们 ack 回应,再消费下一个,就能确保消息顺序性。
但是这样的操作也会降低消费者的性能, 一个消费者消费消息时,其他消费者会阻塞,所以很多场景下可能并不会采用这样的方案。
所以一般会根据场景,制定一定的策略来解决消费顺序问题。
多线程并发抢占出现消费乱序问题
当MQ采用简单队列模式的时候,如果消费者采用多线程的方式来加速消息的处理,此时也会出现消息乱序的问题。
多线程并发抢占出现消费乱序问题,将消息ID进行hash计算,将相同值放入同一个内存队列,让指定线程执行,即可解决顺序消费问题。
在多个分区中保证消息顺序和消息处理效率
首先使用多个分区,消息可以被发送端发送至多个分区,保证消息发送的效率。然后在消费端在拉消息时使用ConutdownLunch来记录一组有序消息的个数。如果达到个数,说明已拉取到完整的一组有序消息。然后在消费端根据消息序号进行排序,消费端将排好序的消息发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。即可最大程度上既保证顺序又保证效率!
RocketMQ作为阿里开源的一款高性能、高吞吐量的消息中间件,支持顺序消息,所以如果有这种场景或者要使用MQ,我建议你直接使用RocketMQ即可。
我们说了一些处理与分析问题的方法,这里有一个最重要的点就是我们需要有一套实用的监控发现工具或者方式,在问题第一时间发现才是王道,不然我们上面所说的都空谈,当问题发现的时候损失已经无法挽回。所以我们要在设计系统之初需要要为监控系统或者程序提供完备或者必须的日志,接口,数据等,这要才是一个合理的设计。当没有监控系统的情况下我们必须自己设计一套简单分析接口。
最后, 如果我的文章对你有所帮助或者有所启发, 欢迎关注公众号(微信搜索公众号:首席架构师专栏),里面有许多技术干货,也有我对技术的思考和感悟,还有作为架构师的验验分享;关注后回复 【面试题】,有我准备的面试题、架构师大型项目实战视频等福利 , 我会带着你一起学习、成长,让我们一起加油!!!
以上是关于如何解决 顺序消费重复消费的主要内容,如果未能解决你的问题,请参考以下文章