RocketMQ 事务型消息 + 异步扣减库存实现
Posted offerNotFound
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 事务型消息 + 异步扣减库存实现相关的知识,希望对你有一定的参考价值。
RocketMQ 事务型消息
事务型消息为了保证最终一致性(就是本地执行的事务与消费者的消费操作保持原子性)会用到两阶段提交,这里就简单记录下其中的过程。
图解:生产者(Preducer
)、消费者(Consumer
)、代理服务器(Broker
,因为 Broker
会有 Name Server
来绑定,所以这里就简称MQ Server
)和 数据库 mysql
。
两阶提交段步骤:
- 首先,生产者会发送一个半成品消息给代理(不让消费者去消费),然后代理会给消费者返回一个 OK 。
- 本地执行事务,生产者这时就会访问
MySQL
执行一个事务。 - 如果本地事务执行成功了,生产者就会告诉代理让消费者可以去消费了;如果本地事务执行失败了,生产者就会告诉代理回滚不让消费者感知到。
- 如果上述第三步失败了(即生产者通知代理由于一些原因没通知到),而本地事务执行成功了。这时代理因为只收到了一个半成品消息,却久久没收到通知,它这时就会有一个回查机制自动去核查(每隔一段时间去查一下,并且间隔时间会越来越大)。
- 生产者收到代理通知的核查后,它就会去检查数据库刚才的数据提交了没有。
- 生产者核查完后就会有一个结果,这个核查结果消费者就会反馈给代理(成功了 commit,消费者去消费;失败了就 rollback)
前三步可以看做是第一阶段,后三步核查可以看做第二阶段。
那么如果回查也一直失败,都回查了几天了,依然没有一个成功的反馈呢?这时因为数据是存在硬盘里的(硬盘的空间足够大),就会把消息放到死性队列里,再由运维人员或开发人员去处理。
其中需要代码实现的操作有:
- ① 生产者发送半成品消息给代理
- ② 生产者执行本地事务
- ③ 消费者回查数据库
异步扣减库存
个人的秒杀项目里记录下使用 RoceketMQ 事务型消息 实现异步扣减库存操作。由于项目里用到了缓存,所以将库存复制一份放缓存里,当用户下单时区扣减缓存里的库存,等过一会再由 MQ 的消费者去扣减数据库里的缓存与更新销量。
那么大致过程如下:
- 生产者发消息给代理
- 生产者执行的本地事务就是先扣减缓存中的库存,再创建订单。
- 第二步成功了,消费者才去扣减数据库的缓存。失败就不扣减,保持一致性。
但是光有这些还不够,因为有回查机制的存在,当生产者核查数据库时没查的东西(检查订单的话看不出问题来)。所以还需要给库存创建一个流水记录,让生产者去检查流水来判断库存的状态,到底库存是减了还是没减。
相比于直接检查数据库中的库存的话,使用流水的两个原因:①数据库里的库存与缓存里的存库状态时异步进行的,检查的得是缓存里的库存是否出问题了 ② 流水是一张单独的表,相比于库存那张表,锁的力度很低,性能上影响不大
正确过程:
- 先创建一个流水,生产者再发消息给代理
- 生产者执行的本地事务就是先扣减缓存中的库存,再创建订单,最后还需要更新流水状态与销量。
- 第二步成功了,消费者才去扣减数据库的缓存。失败就不扣减,保持一致性。
- 这时如果需要回查,生产者就会去检查数据库里存的流水(即与缓存里的库存相关的流水)。
以上是关于RocketMQ 事务型消息 + 异步扣减库存实现的主要内容,如果未能解决你的问题,请参考以下文章