消息队列聊一下如何避免消息的重复消费
Posted qxlxi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列聊一下如何避免消息的重复消费相关的知识,希望对你有一定的参考价值。
什么是重复消费
一条消息在传输过程中,为了保证消息的不丢失,可能会多少量的消息进行重试,这样就可能导致Broker接受到的消息出现重复,如果说下游系统没有针对业务上的处理,那么可能导致同一笔借款或者支付订单出现重复扣款或者重复还款的情况。业务上是不允许出现的。
在MQTT 协议,给出了如下三种方式
- 至多一次 (At Most Once) : 也就是针对每条消息即使出现异常的情况,也只会发送一次。应用场景:在一些硬件数据上传热点数据,可以使用。
- 至少一次(At Least Once) : 为了保证消息不可靠传输,可能针对少量消息进行重复发送,那么就会出现同一个消息重复出现。
- 恰好一次(Exactly Once) : 消息只会发送一次,不允许丢失也不会重复。
幂等性
幂等在计算机中是非常重要的概念,说白了就是多次执行一段函数的结果是一样的,比如说 * 1,那么 执行多次结果都是本身,但是如果是+1的操作,那么就不是幂等,每次结果都会加1。而数据库中增删改查,只有查是幂等,其他都会修改数据原有的状态。
在重试的场景下,我们一般都需要进行重试&幂等 进行配合使用,因为在数据在网络中传输,比如出现网络抖动,数据丢包以及挖掘机挖断网线(开玩笑😝了)这个时候 一般上游系统就会重试操作,而下游系统就需要支持幂等。
往大了说在分布式高可用设计架构中,幂等&重试 也是我们需要在架构设计中要考虑的要点,不仅仅设计中间件的应用中,所以你看知识都是相通的。
接着我们说,既然可能出现消息的重复消费,那么要么在Producer端保证消息的幂等发送,要么在Consumer端保证幂等消费。
Producer 幂等
在Kafka 0.11版本后,引入了幂等性和事务。具体就是通过在配置对象中添加如下
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
Producer 自动升级成幂等性 Producer
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。 说白了就是,每次重启Broker,就会失效。
但是如果我们想保证在所有分区下Producer的幂等性,就需要考虑事务型Producer。这也是幂等型Producer和事务型Producer的最大区别。
Kafka事务原理
事务Producer
事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
设置事务型 Producer 的方法也很简单,满足两个要求即可:
- 和幂等性 Producer 一样,开启 enable.idempotence = true。
- 设置 Producer 端参数 transctional. id。最好为其设置一个有意义的名字。
kafkaProducer.initTransactions(); //初始化
kafkaProducer.beginTransaction(); //开启事务
try
for (int i = 0; i < 5; i++)
kafkaProducer.send(new ProducerRecord<>("test1","aaa"+i));
kafkaProducer.commitTransaction(); //提交事务
catch (Exception e)
kafkaProducer.abortTransaction(); //出现异常 事务回滚
幂等性 Producer 只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。
但是事务型性能比较差。
Consumer幂等
上面我们说了生产端的幂等发送,但是只在生产端保证其实不够,消费端也需要保证幂等消费。
也即:At least once + 幂等消费 = Exactly once。
- 数据库唯一约束/redis set Nx
- 为更新数据添加条件
- 记录并检查操作
我们来总体说一下上面的三种方案,
第一种,一般就是在消费端消费的时候将消费数据插入到DB中,并且保证唯一约束,当消费一次后,即使同一条消息在消费一次,那么数据库层面唯一约束也会保证存在记录,不会执行后面的逻辑。或者使用redis的setNx进行保证。
第二种,在消费端消费消息的时候,可以判断一下当前的状态或者数据是否是已经处理过的状态,如果是直接丢弃,比如生产端发送的消息是初始化订单状态,但是消费端如果接受到的是处理中订单,说明已经处理过,没必要进行处理。
小结
本节主要介绍了消息重复消费,以及引出幂等性,然后分别是生产端(幂等/事务)以及消费端的幂等,但是在开篇已经说了,幂等超时重试不仅仅在消息队列中存在,在HTTP服务设计,保证表单或者APP重复提交,以及在微服务设计幂等,保证RPC自动重试也是同样适用的。
RabbitMQ如何避免消息重复投递或重复消费?
在消息生产时,MQ 内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传), 避免重复的消息进入队列;
在消息消费时,要求消息体中必须要有一个bizId( 对于同一业务全局唯一,如支付ID、订单ID、帖子ID 等) 作为去重的依据, 避免同一条消息被重复消费。
以上是关于消息队列聊一下如何避免消息的重复消费的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud(28)——Stream重复消费与持久化
Kafka在高并发的情况下,如何避免消息丢失和消息重复?kafka消费怎么保证数据消费一次?数据的一致性和统一性?数据的完整性?