MQ消息丢失,消息一致性,重复消费解决方案

Posted 欢少的成长之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQ消息丢失,消息一致性,重复消费解决方案相关的知识,希望对你有一定的参考价值。

大家好,我是Leo。

这是开端的第三次循环了。当前正在正处于RocketMQ基础原理。

3万字聊聊什么是RocketMQ(一)

4万字聊聊阿里二面,保证你看不完

聊聊Redis面试题

2万字聊聊什么是秒杀系统(中)

3万字聊聊什么是Redis(完结篇)

3万字聊聊什么是MySQL(初篇)

本章概括

分布式事务

由何而来

我们在使用MQ在解决实际业务场景中的问题时,往往伴随诸多问题!比如如下图

上述两种可能都会导致数据不一致,在业务系统中是 致命的问题

这个时候我们就要保证事务消息。要不全部成功,要不全部失败。来达到订单服务,购物车服务的数据一致性!

对于购物车服务收到订单创建成功消息清理购物车这个操作来说,失败的处理比较简单,只要成功执行购物车清理后再提交消费确认即可,如果失败,由于没有提交消费确认,消息队列会自动重试。

解决了购物车服务问题,剩下的就是订单服务这边的创建订单,生产消息这两步了。要么全部成功,要么全部失败,不允许一个成功,一个失败的情况。

一旦订单控制不住,购物车那边也是控制不住的! 这就是事务需要解决的问题了!

什么是分布式事务

事务就是为了保证这些数据的完整性和一致性,我们希望这些更新操作要么全部成功,要么全部失败。这就是我们通过对事务的理解。如果严格来说,MQ的事务和mysql一样,都具有四种属性 ACID

  1. 原子性:一个事务操作不可分割,要么成功,要么失败,不能有一半成功一半失败的情 况
  2. 一致性:这些数据在事务执行完成这个时间点之前,读到的一定是更新前的数据,之后 读到的一定是更新后的数据,不应该存在一个时刻,让用户读到更新过程中的数据
  3. 隔离性:指一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对 正在进行的其他事务是隔离的,并发执行的各个事务之间不能互相干扰
  4. 持久性:指一个事务一旦完成提交,后续的其他操作和故障都不会对事务的结果产生任何 影响

对于单体服务来说,都实现了ACID,但是对于分布式系统来说,实现ACID这几乎是不可能的,或者说代价太大。所有目前大家所说的分布式事务,更多的情况下,是一种分布式事务的不完整实现。不同的应用场景中,有不同的实现,目的都是通过一些妥协来解决实际问题。

比较常见的分布式事务有

  • 2PC(Two-phase Commit,也叫二阶段提 交)
  • TCC(Try-Confirm-Cancel)
  • 和事务消息

事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。 比如我们在开始时提到的那个例子,在创建订单后,如果出现短暂的几秒,购物车里的商品没有被及时清空,也不是完全不可接受的,只要最终购物车的数据和订单数据保持一致就可以了。

剩下的就不做过多解释了。

MQ是如何实现的

MQ主要借助的是 半消息 实现的,如下图

  1. 订单服务首先会开启一个事务,就类似于MySQL那样。
  2. 对MQ生产一个半消息
  3. 以上都没有问题之后,就会执行事务,写入数据库
  4. 提交事务或回滚事务

这里的半消息,并不是只有一半的数据。而是有全部的数据,这里的半只是 在事务提交之前,对于消费者来说,这个消息是不可见的

到了这里,订单服务肯定是没有问题的,所以把数据写入到MQ的Broker之后

这里回顾一下生产端的交互流程,可以参考下列图片,理解

  1. 订单服务会向MQ的Broker发送一个ACK包
  2. 如果Broker确认收到了,会给订单服务回一个ACK+SYN包 (如果Broker没有收到,会开始重传)
  3. 如果Broker收到了,一定可以确保订单服务的数据执行完成,以及确保数据已经到Broker了。

到了这里,订单服务,Broker端是没有问题的,把数据写入Broker之后,购物车服务就会开始进行消费这条消息

这里回顾一下消费端的交互流程,可以参考下列图片,理解

  1. 购物车服务在监听收到消息后进行消费
  2. 当购物车服务执行了当前的逻辑之后,会给Broker发送一个 ACK+SYN包确认消费
  3. 如果购物车服务没有给Broker回复,那么Broker就会开始重发

到了这里,订单服务,Broker端,购物车服务基本实现了 要么成功,要么失败 的一致性要求。

天网恢恢疏而不漏,在第四步的时候提交事务,如果失败了怎么办?

Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中 反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿

RocketMQ是如何实现的

这里RocketMQ也给出了相应的应对策略!在事务实现中,他加了 事务反查的机制 来解决事务的提交失败问题。

如果订单服务,在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去订单服务上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

为了支撑这个机制,我们需要做一个反查本地事务状态的接口,告知RocketMQ本地事务是否成功。

例如 只需要根据消息中的订单ID,检查这个订单是否创建成功即可

这个反查本地事务的实现,并不依赖订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ 依然 可以通过其他订单服务的节点来执行反查,确保事务的完整性

确保消息不会丢失

聊到消息一致性,可靠性传输,我们可以从问题的根源入手。我先列举一些容易出问题的故障点

  • **生产阶段:**在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
  • **存储阶段:**在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
  • **消费阶段:**在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失

存储阶段

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

如果对消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢消息。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

集群我不会,后续再更新。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

消息丢失检测

前期代码健壮性不友好的情况,可以在拦截器里编写日志输出,把消费的id号记录下来。

  • 生产者,生产一条就记录一条
  • 消费者,消费一条就记录一条

这样这样两边对照就可以把丢失的id号 定位出来。也可以通过分布式链路追踪系统 扯远了,以后再说吧

确保消息不被重复消费

为什么会有重复消息

在消息传递过程中,如果出现传递失败的情况,发送方会执行重试,重试的过程中就有可能 会产生重复的消息。对使用消息队列的业务系统来说,如果没有对重复消息进行处理,就有可能会导致系统的数据出现错误。

所以重复消费的情况必然存在

在MQTT协议中,大概提供了三种标准

  1. At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什 么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使 用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
  2. At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消 息,但是允许有少量重复消息出现。
  3. Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重 复,这个是最高的等级。

大多数的消息队列,都是采用的 At least once: 至少一次

根据上面介绍,我们可以得知 消息队列很难保证消息不重复

既然消息队列,无法保证重复消费的问题,那我们就要在程序里解决这个问题了。

如何解决重复消费(幂等性)

幂等性是一个数学上的概念,它是这样定义的:如果一个函数 f(x) 满足:f(f(x)) = f(x),则函数 f(x) 满足幂等性。

这里被扩展到计算机领域,被广泛的应用于多次执行产生的影响均与一次执行的影响相同

使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。

(这里可以联想到 用户充值,多次消费充值的话,肯定是有问题的!)

如果说MQ解决不了数据重复消费的问题,那么现在可以转化成 At least once + 幂等性 = Exactly once 这样就可以保证重复消费了。主要有下列三种方法

  • 数据库的唯一约束实现幂等
  • 为更新的数据设置前置条件
  • 记录并检查操作

数据库的唯一约束实现幂等

我先举一个我自己系统的例子:用户在充值账号余额时,会产生一个账单ID。

我们在实现唯一约束的时候就可以重新创建一个表。伪代码如下

create table aaa(
    id  bigint(15) not null comment '约束id',
    user_id bigint(15) not null comment '用户id',
    bill_id bigint(15) not null comment '账单id',
    money decimal(10,2) not null comment '充值金额',
    PRIMARY KEY (`id`) USING BTREE,
    KEY `adasdasdas` (`user_id`,`bill_id`),  -- 唯一约束  用户di和账单id
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='账单约束表';

这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。

基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

参考李玥老师的 消息队列高手课 思想

为更新的数据设置前置条件

在更新数据时,我们可以设置一个更新前的值,如下图。

这里可以加一个充值前金额,这里因为我的体量,并发不大,暂时没加,后面我会根据老板的要求再加的。

如果有重复订单打过来,那我就可以计算充值前的金额,以及当前的付款金额。来付款来实现幂等性。

也可以通过版本号控制,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

在修改数据记录并检查操作

可以采用Token,UUID的方式实现幂等性。这种方式是通用性比较强的。实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

结尾

有些不懂的地方或者不对的地方,麻烦各位指出,一定修改优化!

消息中间件(消息队列MQ)简介

一、为什么要使用MQ

1. 异步:快速返回

2. 解耦:解除依赖

3. 削峰填谷

 

二、MQ的缺点

1. 系统可用性降低,因为MQ可能会挂

2. 系统复杂性提高,要考虑消息重复、丢失、顺序等问题

3. 数据一致性问题,生产者并不知道消费者是否真正消费了

 

三、怎么保证MQ消息不丢失

1. 生产者丢失数据,confirm机制

2. MQ丢失数据,持久化到磁盘

3. 消费者丢失数据,确认机制

 

四、怎么保证MQ高可用性

1. 单机模式

2. 普通集群模式,无法做到真正的高可用

3. 镜像集群模式,高可用但是性能低

 

以上是关于MQ消息丢失,消息一致性,重复消费解决方案的主要内容,如果未能解决你的问题,请参考以下文章

MQ问题及解决方案

mq如何保证高可用,解决重复消费、数据丢失问题和顺序性问题

MQ-面试题

MQ那点破事,消息丢失重复消费消费顺序堆积事务高可用....

面试官杠上重复消费、消息堆积、消息丢失、顺序消息?

浅谈RocketMQ如何保证消息不丢失