怎么处理消息重发的问题?

Posted 技术修行者

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了怎么处理消息重发的问题?相关的知识,希望对你有一定的参考价值。

这篇文章主要描述如何解决消息重发的问题,目前主流的消息队列产品都采用了At least once的服务质量,这就导致了很难避免消息重发的情况,我们可以将消费者业务逻辑设计成幂等服务来解决消息重发问题。

消息队列在消息传递的过程中,如果出现传递失败的情况,发送方会重试,在重试的过程中,可能会产生重复的消息。

消息重复的情况必然存在

关于传递消息时能够提供的服务质量标准,MQTT协议给出了三种不同的标准:

  1. At most once:至多一次,消息在传递时,最多会被送达一次,一般适用于对消息可靠性要求不高的监控场景。
  2. At least once:至少一次,消息在传递时,至少会被送达一次,不允许丢消息,但是允许有少量重复消息。
  3. Exactly once:恰好一次,消息在传递时,只会被送达一次,不允许丢失也不允许重复。

我们常用的大部分消息队列提供的服务质量都是At least once,包括RocketMQ、RabbitMQ和Kafka。所以说消息队列很难保证消息不重复。

怎么解决消息重发的问题

既然消息队列不可避免的会有消息重发的问题,那么我们应该怎么去解决呢?

一般解决重复消息的办法是在消费端,让我们的消费消息的操作具有幂等性

一个幂等操作的特点是将其任意多次执行所产生的影响与一次执行的响应相同。一个幂等的方法, 使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是样的。所以我们不需要担心针对幂等的方法执行多次会对系统造成任何改变。

从对系统的影响结果来看:At least once + 幂等消费 = Exactly once。

如何实现幂等操作呢?可以从业务逻辑设计上少,将消费的业务逻辑设计成具有幂等性的操作。

接下来,我们来看三种不同方式来实现幂等。

利用数据库的唯一约束实现幂等

首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。

这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户ID转账单ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。

只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等操作。

为更新的数据设置前置条件

这种方式的思路是:给数据变更设置一个前置条件,如果满足条件就更新数据,否则就拒绝更新数据,在更新数据的同时,变更前置条件中需要判断的数据。这样在重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的依据,不满足前置条件,则不会重复执行数据更新操作。

如果我们更新的是一个复杂的业务相关数据,我们可以为数据增加一个版本号属性,,每次更新之前,比较当前数据版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号加1。

记录并检查操作

这种方式的思路是:在发送消息的时候,给每条消息指定一个全局唯一ID,消费时,先根据这个ID检查这条消息是否有被消费过,如果没有消费过,才更新数据,不然就将消费状态置为已消费。

以上是关于怎么处理消息重发的问题?的主要内容,如果未能解决你的问题,请参考以下文章

朋友们好,请教tcp/ip中tcp重发的次数和超时时间是多少

ActiveMQ中消息的重发与持久化保存

学习ActiveMQ:JMS消息的确认与重发机制

springboot+activemq中引入重发机制

7道消息队列ActiveMQ面试题!

微信发送消息怎么支持HTML代码转换