跟我学RocketMQ之消息幂等
Posted RocketMQ官微
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了跟我学RocketMQ之消息幂等相关的知识,希望对你有一定的参考价值。
RocketMQ消息处理成功的标志是消费者消费一条消息后向Broker端发送ACK消息并且被Broker处理,如果由于网络等原因导致ACK丢失,则RocketMQ会重新消费该条消息。这里就涉及到了消息幂等的概念。
首先我们了解一下什么是幂等,以及何为消息幂等。
什么是幂等
百度对 “幂等” 解释如下
设f为一由X映射至X的一元运算,则f为幂等的,当对于所有在X内的x,f(f(x)) = f(x).特别的是,恒等函数一定是幂等的,且任一常数函数也都是幂等的。
这里的关键是 f(f(x)) = f(x), 翻译成通俗的解释就是:
如果有一个操作,多次执行与一次执行所产生的影响是相同的,我们就称这个操作是幂等的。
关于消息幂等
基于上述的概念,结合消息消费的场景,我们能够很容易的总结出消息幂等的概念:
如果消息重试多次,消费者端对该重复消息消费多次与消费一次的结果是相同的,并且多次消费没有对系统产生副作用,那么我们就称这个过程是消息幂等的。
例如:
支付场景下,消费者消费扣款消息,对一笔订单进行扣款操作,该扣款操作需要扣除10元。
这个扣款操作重复多次与执行一次的效果相同,只进行一次真实扣款,用户的扣款记录中对应该笔订单的只有一条扣款流水。不会多扣。那么我们就说这个扣款操作是符合要求的,这个消费过程是消息幂等的。
需要进行消息幂等的场景
首先我们回顾一下需要进行消息幂等的场景,也就是上一篇文章提到的消息重复的场景。
发送时重复:
生产者发送消息时,消息成功投递到broker,但此时发生网络闪断或者生产者down掉,导致broker发送ACK失败。此时生产者由于未能收到消息发送响应,认为发送失败,因此尝试重新发送消息到broker。当消息发送成功后,在broker中就会存在两条相同内容的消息,最终消费者会拉取到两条内容一样并且Message ID也相同的消息。因此造成了消息的重复。消费时重复:
消费消息时同样会出现重复消费的情况。当消费者在处理业务完成返回消费状态给broker时,由于网络闪断等异常情况导致未能将消费完成的CONSUME_SUCCESS状态返回给broker。broker为了保证消息被至少消费一次的语义,会在网络环境恢复之后再次投递该条被处理的消息,最终造成消费者多次收到内容一样并且Message ID也相同的消息,造成了消息的重复。
可以看到,无论是发送时重复还是消费时重复,最终的效果均为消费者消费时收到了重复的消息,那么我们就知道:只需要在消费者端统一进行幂等处理就能够实现消息幂等。
实现消息幂等
那么如何才能实现消息幂等呢?
首先我们要定义消息幂等的两要素:
幂等令牌
处理唯一性的确保
我们必须保证存在幂等令牌的情况下保证业务处理结果的唯一性,才认为幂等实现是成功的。
接下来分别解释这两个要素
幂等令牌
幂等令牌是生产者和消费者两者中的既定协议,在业务中通常是具备唯一业务标识的字符串,如:下单场景使用订单号、支付场景使用支付流水号等。且一般由生产者端生成并传递给消费者端。
处理唯一性的确保
即服务端应当采用一定的策略保证同一个业务逻辑一定不会重复执行成功多次。如:使用支付宝进行支付,买一个产品支付多次只会成功一笔。
较为常用的方式是采用缓存去重并且通过对业务标识添加数据库的唯一索引实现幂等。
具体的思路为:如支付场景下,支付的发起端生成了一个支付流水号,服务端处理该支付请求成功后,数据持久化成功。由于表中对支付流水添加了唯一索引,因此当重复支付时会因为唯一索引的存在报错 duplicate entry,服务端的业务逻辑捕获该异常并返回调用侧“重复支付”提示。这样就不会重复扣款。
在上面场景的基础上,我们还可以引入Redis等缓存组件实现去重:当支付请求打到服务端,首先去缓存进行判断,根据key=“支付流水号”去get存储的值,如果返回为空,表明是首次进行支付操作同时将当前的支付流水号作为key、value可以为任意字符串通过set(key,value,expireTime)存储在redis中。
当重复的支付请求到来时,尝试进行get(支付流水号)操作,这个操作会命中缓存,因此我们可以认为该请求是重复的支付请求,服务端业务将重复支付的业务提示返回给请求方。
由于我们一般都会在缓存使用过程中设置过期时间,缓存可能会失效从而导致请求穿透到持久化存储中(如:mysql)。因此不能因为引入缓存而放弃使用唯一索引,将二者结合在一起是一个比较好的方案。
RocketMQ场景下如何处理消息幂等
了解了两个要素及典型案例之后,我们回到消息消费的场景。
作为一款高性能的消息中间件,RocketMQ能够保证消息不丢失但不保证消息不重复。如果在RocketMQ中实现消息去重实际也是可以的,但是考虑到高可用以及高性能的需求,如果做了服务端的消息去重,RocketMQ就需要对消息做额外的rehash、排序等操作,这会花费较大的时间和空间等资源代价,收益并不明显。RocketMQ考虑到正常情况下出现重复消息的概率其实是很小的,因此RocketMQ将消息幂等操作交给了业务方处理。
实际上上述问题的本质在于:网络调用本身存在不确定性,也就是既不成功也不失败的第三种状态,即所谓的 处理中 状态,因此会有重复的情况发生。这个问题是很多其他的MQ产品同样会遇到的,通常的方法就是要求消费方在消费消息时进行去重,也就是本文我们说的消费幂等性。
对RocketMQ有一定使用经验的读者可能注意到,每条消息都有一个MessageID,那么我们能否使用该ID作为去重依据,也就是上面提到的幂等令牌呢?
答案是否定的,因为MessageID可能出现冲突的情况,因此不建议通过MessageID作为处理依据而应当使用业务唯一标识如:订单号、流水号等作为幂等处理的关键依据。
上面也提到了,幂等依据应当由消息生产者生成,在发送消息时候,我们能够通过消息的key设置该id,对应的API为 org.apache.rocketmq.common.message.setKeys(String keys) 代码如下:
1Message sendMessage = new Message(
2 MessageProtocolConst.WALLET_PAY_TOPIC.getTopic(),
3 message.getBytes());
4
5sendMessage.setKeys("OD0000000001");
当消息消费者收到该消息时,根据该消息的key做幂等处理,API为 org.apache.rocketmq.common.message.getKeys() 代码如下:
1(msgs, context) -> {
2 try {
3 // 默认msgs只有一条消息
4 for (MessageExt msg : msgs) {
5 String key = msg.getKeys();
6 return walletCharge(msg);
7 }
8 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
9 } catch (Exception e) {
10 LOGGER.error("钱包扣款消费异常,e={}", e);
11 return ConsumeConcurrentlyStatus.RECONSUME_LATER;
12 }
13}
消费者通过getKeys()能够读取到生产者设置的幂等依据(如:订单号等),然后业务逻辑围绕该id进行幂等处理即可。
如果你觉得每次都需要在生产者侧setkey,在消费者侧getkey,有点繁琐。也可以将该幂等依据设置在消息协议中,消费者接收到消息后解析该id进行幂等操作也是可以的。只需要消息的生产者和消费者约定好如何解析id的协议即可。
具体的幂等逻辑视使用的场景而定,我在这里尝试从我的经验进行一些总结。
消费端常见的幂等操作
业务操作之前进行状态查询
消费端开始执行业务操作时,通过幂等id首先进行业务状态的查询,如:修改订单状态环节,当订单状态为成功/失败则不需要再进行处理。那么我们只需要在消费逻辑执行之前通过订单号进行订单状态查询,一旦获取到确定的订单状态则对消息进行提交,通知broker消息状态为:ConsumeConcurrentlyStatus.CONSUME_SUCCESS 。业务操作前进行数据的检索
逻辑和第一点相似,即消费之前进行数据的检索,如果能够通过业务唯一id查询到对应的数据则不需要进行再后续的业务逻辑。如:下单环节中,在消费者执行异步下单之前首先通过订单号查询订单是否已经存在,这里可以查库也可以查缓存。如果存在则直接返回消费成功,否则进行下单操作。唯一性约束保证最后一道防线
上述第二点操作并不能保证一定不出现重复的数据,如:并发插入的场景下,如果没有乐观锁、分布式锁作为保证的前提下,很有可能出现数据的重复插入操作,因此我们务必要对幂等id添加唯一性索引,这样就能够保证在并发场景下也能保证数据的唯一性。引入锁机制
上述的第一点中,如果是并发更新的情况,没有使用悲观锁、乐观锁、分布式锁等机制的前提下,进行更新,很可能会出现多次更新导致状态的不准确。如:对订单状态的更新,业务要求订单只能从初始化->处理中,处理中->成功,处理中->失败,不允许跨状态更新。如果没有锁机制,很可能会将初始化的订单更新为成功,成功订单更新为失败等异常的情况。
高并发下,建议通过状态机的方式定义好业务状态的变迁,通过乐观锁、分布式锁机制保证多次更新的结果是确定的,悲观锁在并发环境不利于业务吞吐量的提高因此不建议使用。消息记录表
这种方案和业务层做的幂等操作类似,由于我们的消息id是唯一的,可以借助该id进行消息的去重操作,间接实现消费的幂等。
首先准备一个消息记录表,在消费成功的同时插入一条已经处理成功的消息id记录到该表中,注意一定要 与业务操作处于同一个事物 中,当新的消息到达的时候,根据新消息的id在该表中查询是否已经存在该id,如果存在则表明消息已经被消费过,那么丢弃该消息不再进行业务操作即可。
肯定还有更多的场景我没有涉及到,这里说到的操作均是互相之间有关联的,将他们配合使用更能够保证消费业务的幂等性。
不论怎样,请牢记一个原则:缓存是不可靠的,查询是不可靠的 。
在高并发的场景下,一定要通过持久化存储的唯一索引以及引入锁机制作为共同保障数据准确性和完整性的最后一道防线!
总结
本文主要讲解了何为幂等及消息消费场景下如何传递唯一幂等id,并进一步分析了如何保证消息幂等的思路以及总结了常见的消息幂等处理方式。
套路是多变的,关键是掌握思路和方法,我们的原则就是 不管执行多少次,业务表现出来的行为是统一的 , 在这个前提下,我们引入了操作前查库、操作前查缓存、乐观锁/分布式锁机制、加入唯一索引等多重防重放策略,通过这些策略的综合作用,最终达到了消息幂等的目的。
最后有句话分享,有道无术术可求。有术无道止于术。相信聪明的你一定会在技术的道路上结合实际场景将各种技术手段融会贯通,从而走的越来越远。
作者简介:SnoWalker,中间件发烧友,RocketMQ北京社区联合发起人之一,社区布道师。专注于后端分布式领域,对分布式架构、中间件原理、Devops有着较为深刻的认识和丰富的实践经验。
以上是关于跟我学RocketMQ之消息幂等的主要内容,如果未能解决你的问题,请参考以下文章