RocketMQ——Transaction Message(事务消息)

Posted 1013wang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ——Transaction Message(事务消息)相关的知识,希望对你有一定的参考价值。

分布式事务

通过MQ解决分布式事务的思路

1) 业务和消息生成耦合在一起

2) 业务和消息解耦

RocketMQ 中的事务消息

1) 目前RMQ3.2.6中事务消息的实现原理及存在的问题

2) 问题解决思路

本文介绍RocketMQ提供的第三种类型的消息——Transaction Message(事务消息)。在说事务消息之前,我们先来说说分布式事务的那些事!

分布式事务

什么是分布式事务,我的理解是一半事务。怎么说,比如有2个异构系统,A异构系统要做T1,B异构系统要做T2,要么都成功,要么都失败。
要知道异构系统,很显然,不在一个数据库实例上,它们往往分布在不同物理节点上,本地事务已经失效

技术图片

 

 2阶段提交

2阶段提交协议,Two-Phase Commit,是处理分布式事务的一种常见手段。2PC,存在2个重要角色:事务协调器(TC),事务执行者。
2PC,可以看到节点之间的通信次数太多了,时间很长!时间变长了,从而导致,事务锁定的资源时间也变长了,造成资源等待时间变长!在高并发场景下,存在严重的性能问题!

通过MQ解决分布式事务的思路

下面,我们来看看MQ在高并发场景下,是如何解决分布式事务的。

考虑生活中的场景:

我们去北京庆丰包子铺吃炒肝,先去营业员那里付款(Action1),拿到小票(Ticket),然后去取餐窗口排队拿炒肝(Action2)。思考2个问题:第一,为什么不在付款的同时,给顾客炒肝?如果这样的话,会增加处理时间,使得后面的顾客等待时间变长,相当于降低了接待顾客的能力(降低了系统的QPS)。第二,付了款,拿到的是Ticket,顾客为什么会接受?从心理上说,顾客相信Ticket会兑现炒肝。事实上也是如此,就算在最后炒肝没了,或者断电断水(系统出现异常),顾客依然可以通过Ticket进行退款操作,这样都不会有什么损失!(虽然这么说,但是实际上包子铺最大化了它的利益,如果炒肝真的没了,浪费了顾客的时间,不过顾客顶多发发牢骚,最后接受)

生活已经告诉我们处理分布式事务,保证数据最终一致性的思路!这个Ticket(凭证)其实就是消息!

1) 业务和消息生成耦合在一起

技术图片

 

                                                                                                                               业务和消息生成耦合在一起

业务操作和消息的生成耦合在一起,保证了只要A银行的账户发生扣款,那么一定会生成一条转账消息。只要A银行系统的事务成功提交,我们可以通过实时消息服务,将转账消息通知B银行系统,如果B银行系统回复成功,那么A银行系统可以在table中设置这条转账消息的状态。
这样耦合的方式,从架构上来看,就有点不太优雅,而且存在一些问题。比如说,消息的存储实质上是在A银行系统中的,如果A银行系统出了问题,将导致无法转账。如果解耦,将消息独立出来呢?

2) 业务和消息解耦

技术图片

 

 

如上图所示,消息数据独立存储,业务和消息解耦,实质上消息的发送有2次,一条是转账消息,另一条是确认消息。

RocketMQ 中的事务消息

1) 目前RMQ3.2.6中事务消息的实现原理及存在的问题

到这里,先来看看基于RocketMQ的代码:

技术图片

 

                                                                                                       生产者示例代码
生产者这里用到是:TransactionMQProducer。
这里涉及到2个角色:本地事务执行器(代码中的TransactionExecuterImpl)、服务器回查客户端Listener(代码中的TransactionCheckListener)。如果事务消息发送到MQ上后,会回调  本地事务执行器;但是此时事务消息是prepare状态,对消费者还不可见,需要  本地事务执行器  返回RMQ一个确认消息。
技术图片

 

 

本地事务执行器

 

事务消息是否对消费者可见,完全由事务返回给RMQ的状态码决定(状态码的本质也是一条消息)。

技术图片

 

                                                                                                       回查Listener     

生产者发送了2条消息给RMQ,有一条本地事务执行成功,有一条本地事务执行失败。总共是2条业务消息 + 2条确认消息,因此是4条。注意到消费者只消费了一条数据,就是只有告诉RMQ本地事务执行成功的那条消息才会被消费!因此是1条!

但是,注意到本地事务执行失败的消息,RMQ并没有check listener?

这是为什么呢?因为RMQ在3.0.8的时候还是支持check listener回查机制的,但是到了3.2.6的时候将事务回查机制“阉割”了!

那么3.0.8的时候,RMQ是怎么做事务回查的呢?

看一看源码,你会知道,其实事务消息开始是prepare状态,然后RMQ会将其持久化到mysql当中,然后如果收到确认消息,就删除掉这条prepare消息,如果迟迟收不到确认消息,那么RMQ会定时的扫描prepare消息,发送给produce group进行回查确认!

到这里,问题来了,要知道3.2.6版本,没有回查机制了,会存在问题么?

当然会存在问题!假设,我们发送一条转账事务消息给RMQ,成功后回调本地事务,DB减操作成功,刚准备给RMQ一个确认消息,此时突然断电,或者网络抖动,使得这条确认消息没有发送出去。此时RMQ中的那条转账事务消息,始终处于prepare状态,消费者读取不到,但是却已经完成一方的账户资金变动!!!

2) 问题解决思路

既然,RMQ3.2.6版本不为我们进行回查,那么只能由我们自己完成了。
重新看一下“业务和消息解耦”的那张转账流程的图:

技术图片

 

 在正常情况下,当然没有问题,如果第五步(向MQ发送确认消息)出现失败,加上RocketMQ 3.2.6版本没有事务回查机制,就会导致这条转账消息,在A银行完成了操作,但是迟迟对B银行系统不可见!

技术图片

 

 

用户U1从A银行系统转账给B银行系统的用户U2的处理过程如下:
 
A银行系统生成一条转账消息,以事务消息的方式写入RocketMQ,此时B银行系统不可见这条消息
 
写入MQ成功后,回调A银行系统,对T1,T2表进行操作(很显然需要是一个事务)
 
我们重点关注下T2表,这个表是用来干嘛的呢?每条转账消息都会在T2表中,该表有2个特殊的字段:status,updatetime。(用途会在后文详述)
完成第二步,接下来发送确认消息给MQ,如果这个确认消息发送成功,那么这条转账消息,将对B银行系统可见。然后B银行系统,会在一个事务中完成对t3,t5的操作。
 
如果发送确认消息给MQ失败的处理思路:
 
首先,B银行系统,有一个定时任务(比如说每隔1MIN执行一次),扫描表t5,取得一段时间内的数据,发送给A银行系统。要知道t5中的数据,必然是A银行系统成功处理并发送确认消息成功的转账数据。为什么要发送给A银行系统呢,其实就是为了找到那些发送确认消息失败的转账数据。那么怎么发给A银行系统呢,这个方式比较多,可以考虑在来一个Topic,也可以考虑Netty等。发送给A银行系统,其实就是为了更新t2表的status,updatetime。
 
这里有一个关键,如何“扫描表t5,取得一段时间内的数据”?这就是t4的作用,在t4中记录一个time字段,每次定时任务启动,先更新time(比如设定为当前系统时间,设置前的的时间为old),然后扫描出t5中大于这个old时间的转账数据,如此循环往复。
 
其次,A银行系统,也有一个定时任务(可以根据业务消费能力定,可以大一些),扫描t2表(指定status及updatetime条件),将那些确认消息发送失败的转账消息找出来,更新updatetime并发送给MQ。
 
这样,我们并没有改动RocketMQ 3.2.6的源码,而是在外围解决了事务回查!
 
其实到这里,你可以发现RocketMQ的一个特点,就是将生产者和MQ绑定,而不需要特别处理消费者,这是为什么呢?
 
因为消息只要发往RocketMQ成功,那么就意味着成功,为什么这么说?前面,我们说过,消费者端消费消息只会产生2种错误,第一:timeout,第二:exception。要知道RocketMQ对于超时,会不断重试;对于消费异常,会根据消费端的返回码,会有重试机制保证。也就是,RocketMQ一定会让消息得到消费,如果消费有问题,只能是消费者的问题,而不会是RocketMQ的问题!
 
我这儿整理了比较全面的JAVA相关的面试资料,
技术图片
需要领取面试资料的同学,请加群:473984645
技术图片

以上是关于RocketMQ——Transaction Message(事务消息)的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ-事务消费

RocketMQ源码分析之RocketMQ事务消息实现原下篇(事务提交或回滚)

转:KafkaRabbitMQRocketMQ消息中间件的对比 —— 消息发送性能 (阿里中间件团队博客)

机加工行业MES系统模具行业MES系统CNCl中工行业MES系统MES扫码报工MES数据采集

MES系统学习

部署前要知道:离散型MES和流程型MES的区别