你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你

Posted Hollis

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你相关的知识,希望对你有一定的参考价值。

我们将消息队列这个组件加入到了我们的商城系统里,并且通过秒杀这个实际的案例进行了实际演练 ,知道了它对高并发写流量做削峰填谷,对非关键业务逻辑做异步处理,对不同的业务系统做解耦合。
场景:
现在我们的电商系统中上了一个新产品发红包的功能,即当用户在我们商城消费了一定的额度之后,我们系统就给用户发送一个现金红包,用来答谢用户并且促进用户消费。

前面我们说到过,由于这个发红包的动作并不属于当前下单的主流程,所以我们就使用消息队列来异步处理。这个时候,就会有个隐藏问题:
  • 另一个问题,就是如果我们将消息重复发送了,那么用户就会得到两个红包,这样会造成我们公司的损失。


所以,现在我们要确保,系统生产的消息一定要被消费到,并且只能被消费一次,这个到底该怎么做呢?接下来,我们就来深入学习下。



01 为何消息会丢失?

要想保证消息只被消费一次,那么首先就得要保证消息不丢失。我们先来看看,消息从被写入消息队列,到被消费完成,这整个链路上会有哪些地方可能会导致消息丢失?我们不难看出,其实主要有三个地方:
  • 消息从生产者到消息队列的过程。

  • 消息在消息队列存储的过程。

  • 消息在被消费的过程。

你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你


如上,我们分析了共有 3 个消息可能丢失的地方,接下来,我们就具体来分析下每一种情况。


1. 消息在写到消息队列的过程中丢失
消息生产者一般就是业务系统,消息队列是单独部署了在独立的服务器上的,所以业务服务器和消息队列服务器可能会出现网络抖动,当出现了网络抖动,消息就会丢失。


一般这种情况,我们可以采用消息重传的方案,即当我们发现发送的消息超时后,我们就重新发送一次,但是不能一直无限制的重传消息。按照经验来说,如果不是消息队列本身故障,或者是网络断开了,一般重试个 2 到 3 次就行了。


但是,这种方案就有可能造成消息的重复,这样就会导致消费者消费到重复的消息。


例如,消息发送到消息队列中,但是由于消息队列处理消息较慢或者网络抖动,这个时候,其实消息是写入成功的,但是对于生产端就认为超时了,那么生产者就会重传当前消息,则会出现消息重复。对于我们上面案例中,就是用户会收到两个红包。


2. 消息在消息队列中丢失

即使消息发送到了消息队列,消息也不会万无一失,还是会面临丢失的风险。


我们以 Kafka 为例,消息在Kafka 中是存储在本地磁盘上的, 为了减少消息存储对磁盘的随机 I/O,一般我们会将消息写入到操作系统的 Page Cache 中,然后在合适的时间将消息刷新到磁盘上。


例如,Kafka 可以配置当达到某一时间间隔,或者累积一定的消息数量的时候再刷盘,也就是所谓的异步刷盘


不过,如果发生机器掉电或者机器异常重启,那么 Page Cache 中还没有来得及刷盘的消息就会丢失了。那么怎么解决呢?你可能会把刷盘的间隔设置很短,或者设置累积一条消息就就刷盘。


但这样频繁刷盘会对性能有比较大的影响,而且从经验来看,出现机器宕机或者掉电的几率也不高,所以我不建议你这样做。

你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你
如果你的电商系统对消息丢失的容忍度很低,那么你可以考虑以集群方式部署 Kafka 服务,通过部署多个副本备份数据,保证消息尽量不丢失。


那么它是怎么实现的呢?Kafka 集群中有一个 Leader 负责消息的写入和消费,可以有多个 Follower 负责数据的备份。Follower 中有一个特殊的集合叫做 ISR(in-sync replicas),当 Leader 故障时,新选举出来的 Leader 会从 ISR 中选择,默认 Leader 的数据会异步地复制给 Follower,这样在 Leader 发生掉电或者宕机时,Kafka 会从 Follower 中消费消息,减少消息丢失的可能。


由于默认消息是异步地从 Leader 复制到 Follower 的,所以一旦 Leader 宕机,那些还没有来得及复制到 Follower 的消息还是会丢失。


为了解决这个问题,Kafka 为生产者提供一个选项叫做“acks”,当这个选项被设置为“all”时,生产者发送的每一条消息除了发给 Leader 外还会发给所有的 ISR,并且必须得到 Leader 和所有 ISR 的确认后才被认为发送成功。这样,只有 Leader 和所有的 ISR 都挂了,消息才会丢失。

你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你


从上面这张图来看,当设置“acks=all”时,需要同步执行 1,3,4 三个步骤,对于消息生产的性能来说也是有比较大的影响的,所以你在实际应用中需要仔细地权衡考量。这里建议是:

  1. 如果你需要确保消息一条都不能丢失,那么建议不要开启消息队列的同步刷盘,而是需要使用集群的方式来解决,可以配置当所有 ISR Follower 都接收到消息才返回成功。

  2. 如果对消息的丢失有一定的容忍度,那么建议不部署集群,即使以集群方式部署,也建议配置只发送给一个 Follower 就可以返回成功了。

  3. 我们的业务系统一般对于消息的丢失有一定的容忍度,比如说以上面的红包系统为例,如果红包消息丢失了,我们只要后续给没有发送红包的用户补发红包就好了。



3. 在消费的过程中存在消息丢失的可能

还是以 Kafka 为例来说明。一个消费者消费消息的进度是记录在消息队列集群中的,而消费的过程分为三步:接收消息、处理消息、更新消费进度。


这里面接收消息和处理消息的过程都可能会发生异常或者失败,比如说,消息接收时网络发生抖动,导致消息并没有被正确的接收到;处理消息时可能发生一些业务的异常导致处理流程未执行完成,这时如果更新消费进度,那么这条失败的消息就永远不会被处理了,也可以认为是丢失了。


所以,在这里你需要注意的是,一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后,消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后,还会重复地消费这条消息。



02 如何保证消息只被消费一次

从上面的分析中,你能发现,为了避免消息丢失,我们需要付出两方面的代价:一方面是性能的损耗;一方面可能造成消息重复消费。


性能的损耗我们还可以接受,因为一般业务系统只有在写请求时才会有发送消息队列的操作,而一般系统的写请求的量级并不高,但是消息一旦被重复消费,就会造成业务逻辑处理的错误。那么我们要如何避免消息的重复呢?


想要完全的避免消息重复的发生是很难做到的,因为网络的抖动、机器的宕机和处理的异常都是比较难以避免的,在工业上并没有成熟的方法,因此我们会把要求放宽,只要保证即使消费到了重复的消息,从消费的最终结果来看和只消费一次是等同的就好了,也就是保证在消息的生产和消费的过程是“幂等”的。


1. 什么是幂等

幂等是一个数学上的概念,它的含义是多次执行同一个操作和执行一次操作,最终得到的结果是相同的,说起来可能有些抽象,我给你举个例子:


比如,男生和女生吵架,女生抓住一个点不放,传递“你不在乎我了吗?”(生产消息)的信息。那么当多次埋怨“你不在乎我了吗?”的时候(多次生产相同消息),她不知道的是,男生的耳朵(消息处理)会自动把 N 多次的信息屏蔽,就像只听到一次一样,这就是幂等性。


如果我们消费一条消息的时候,要给现有的库存数量减 1,那么如果消费两条相同的消息就会给库存数量减 2,这就不是幂等的。而如果消费一条消息后,处理逻辑是将库存的数量设置为 0,或者是如果当前库存数量是 10 时则减 1,这样在消费多条消息时,所得到的结果就是相同的,这就是幂等的。


说白了,你可以这么理解“幂等”:一件事儿无论做多少次都和做一次产生的结果是一样的,那么这件事儿就具有幂等性。


2. 在生产、消费过程中增加消息幂等性的保证

消息在生产和消费的过程中都可能会产生重复,所以你要做的是,在生产过程和消费过程中增加消息幂等性的保证,这样就可以认为从“最终结果上来看”,消息实际上是只被消费了一次的。


在消息生产过程中,在 Kafka0.11 版本和 Pulsar 中都支持“producer idempotency”的特性,翻译过来就是生产过程的幂等性,这种特性保证消息虽然可能在生产端产生重复,但是最终在消息队列存储时只会存储一份。


它的做法是给每一个生产者一个唯一的 ID,并且为生产的每一条消息赋予一个唯一 ID,消息队列的服务端会存储 < 生产者 ID,最后一条消息 ID> 的映射。当某一个生产者产生新的消息时,消息队列服务端会比对消息 ID 是否与存储的最后一条 ID 一致,如果一致,就认为是重复的消息,服务端会自动丢弃。

而在消费端,幂等性的保证会稍微复杂一些,你可以从通用层和业务层两个层面来考虑。


你可以看到,无论是生产端的幂等性保证方式,还是消费端通用的幂等性保证方式,它们的共同特点都是为每一个消息生成一个唯一的 ID,然后在使用这个消息的时候,先比对这个 ID 是否已经存在,如果存在,则认为消息已经被使用过。


所以这种方式是一种标准的实现幂等的方式,你在项目之中可以拿来直接使用,它在逻辑上的伪代码就像下面这样:

boolean isIDExisted = selectByID(ID); // 判断ID是否存在if(isIDExisted) {return; //存在则直接返回} else {process(message); //不存在,则处理消息saveID(ID); //存储ID}


不过这样会有一个问题:如果消息在处理之后,还没有来得及写入数据库,消费者宕机了重启之后发现数据库中并没有这条消息,还是会重复执行两次消费逻辑。

这时你就需要引入事务机制,保证消息处理和写入数据库必须同时成功或者同时失败,但是这样消息处理的成本就更高了,所以,如果对于消息重复没有特别严格的要求,可以直接使用这种通用的方案,而不考虑引入事务。


在业务层面怎么处理呢?这里有很多种处理方式,其中有一种是增加乐观锁的方式。比如,你的消息处理程序需要给一个人的账号加钱,那么你可以通过乐观锁的方式来解决。


具体的操作方式是这样的:你给每个人的账号数据中增加一个版本号的字段,在生产消息时先查询这个账户的版本号,并且将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号后,在执行更新账户金额 SQL 的时候带上版本号,类似于执行:


update user set amount = amount + 20, version=version+1 where userId=1 and version=1;


你看,我们在更新数据时给数据加了乐观锁,这样在消费第一条消息时,version 值为 1,SQL 可以执行成功,并且同时把 version 值改为了 2;在执行第二条相同的消息时,由于 version 值不再是 1,所以这条 SQL 不能执行成功,也就保证了消息的幂等性。


总结,今天我们主要学习了在消息队列中,消息可能会发生丢失的场景,和我们的应对方法,以及在消息重复的场景下,我们要如何保证,尽量不影响消息最终的处理结果。



有道无术,术可成;有术无道,止于术


好文章,我在看❤️

以上是关于你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你的主要内容,如果未能解决你的问题,请参考以下文章

消息队列重复消费和数据丢失问题(石衫面试突击学习笔记)

关于MQ中的“有且只消费一次”误导了很多人。分布式锁安全吗?

如何保证消息队列的可靠性传输?

2020-12-25:MQ中,如何保证消息的顺序性?

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?