RocketMQ 事务型消息 + 异步扣减库存实现

Posted offerNotFound

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 事务型消息 + 异步扣减库存实现相关的知识,希望对你有一定的参考价值。

RocketMQ 事务型消息

事务型消息为了保证最终一致性(就是本地执行的事务与消费者的消费操作保持原子性)会用到两阶段提交,这里就简单记录下其中的过程。

图解:生产者(Preducer)、消费者(Consumer)、代理服务器(Broker,因为 Broker 会有 Name Server 来绑定,所以这里就简称MQ Server)和 数据库 mysql

两阶提交段步骤:

  1. 首先,生产者会发送一个半成品消息给代理(不让消费者去消费),然后代理会给消费者返回一个 OK 。
  2. 本地执行事务,生产者这时就会访问 MySQL 执行一个事务。
  3. 如果本地事务执行成功了,生产者就会告诉代理让消费者可以去消费了;如果本地事务执行失败了,生产者就会告诉代理回滚不让消费者感知到。
  4. 如果上述第三步失败了(即生产者通知代理由于一些原因没通知到),而本地事务执行成功了。这时代理因为只收到了一个半成品消息,却久久没收到通知,它这时就会有一个回查机制自动去核查(每隔一段时间去查一下,并且间隔时间会越来越大)。
  5. 生产者收到代理通知的核查后,它就会去检查数据库刚才的数据提交了没有。
  6. 生产者核查完后就会有一个结果,这个核查结果消费者就会反馈给代理(成功了 commit,消费者去消费;失败了就 rollback)

前三步可以看做是第一阶段,后三步核查可以看做第二阶段。

那么如果回查也一直失败,都回查了几天了,依然没有一个成功的反馈呢?这时因为数据是存在硬盘里的(硬盘的空间足够大),就会把消息放到死性队列里,再由运维人员或开发人员去处理。

其中需要代码实现的操作有:

  • ① 生产者发送半成品消息给代理
  • ② 生产者执行本地事务
  • ③ 消费者回查数据库

异步扣减库存

个人的秒杀项目里记录下使用 RoceketMQ 事务型消息 实现异步扣减库存操作。由于项目里用到了缓存,所以将库存复制一份放缓存里,当用户下单时区扣减缓存里的库存,等过一会再由 MQ 的消费者去扣减数据库里的缓存与更新销量。

那么大致过程如下:

  1. 生产者发消息给代理
  2. 生产者执行的本地事务就是先扣减缓存中的库存,再创建订单。
  3. 第二步成功了,消费者才去扣减数据库的缓存。失败就不扣减,保持一致性。

但是光有这些还不够,因为有回查机制的存在,当生产者核查数据库时没查的东西(检查订单的话看不出问题来)。所以还需要给库存创建一个流水记录,让生产者去检查流水来判断库存的状态,到底库存是减了还是没减。

相比于直接检查数据库中的库存的话,使用流水的两个原因:①数据库里的库存与缓存里的存库状态时异步进行的,检查的得是缓存里的库存是否出问题了 ② 流水是一张单独的表,相比于库存那张表,锁的力度很低,性能上影响不大

正确过程:

  1. 先创建一个流水,生产者再发消息给代理
  2. 生产者执行的本地事务就是先扣减缓存中的库存,再创建订单,最后还需要更新流水状态与销量。
  3. 第二步成功了,消费者才去扣减数据库的缓存。失败就不扣减,保持一致性。
  4. 这时如果需要回查,生产者就会去检查数据库里存的流水(即与缓存里的库存相关的流水)。

以上是关于RocketMQ 事务型消息 + 异步扣减库存实现的主要内容,如果未能解决你的问题,请参考以下文章

第七章 缓存库存

第八章 事务型消息

RocketMQ 事务消息

16 基于MQ实现秒杀订单系统的异步化架构以及精准扣减库存的技术方案

RabbitMQ 消息顺序消息幂等消息重复消息事务集群

RocketMQ 消息详解