RocketMQ的分布式事务机制(事务消息)
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ的分布式事务机制(事务消息)相关的知识,希望对你有一定的参考价值。
详细介绍了RocketMQ的事务消息机制,RocketMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。
分布式事务常用于保证两个独立的系统之间的数据或者状态的一致性,常见的方案有TCC(Try-Confirm-Cancel),XA两阶段提交方案,可靠消息最终一致性方案,最大努力通知方案等等,最常见的就是最终一致性方案。
使用案例:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md#6-%E6%B6%88%E6%81%AF%E4%BA%8B%E5%8A%A1%E6%A0%B7%E4%BE%8B,使用TransactionMQProducer作为事务消息生产者,通过sendMessageInTransaction方法发送事务消息,通过设置TransactionListener(executeLocalTransaction执行本地事务、checkLocalTransaction检查本地事务状态)执行回调,可以通过setExecutorService设置自定义线程池来处理这些检查请求。
每个事务消息都有唯一ID,在TransactionListener中,可以通过事务id来使得执行本地事务与检查本地事务这两个操作操作产生联系。这两个方法返回LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW这三种状态。
事务消息不支持延时消息和批量消息。设置了DelayTimeLevel后,数据事务提交后(或是回查数据库事务完成后),将消息写入目标Topic时,由于DelayTimeLevel的干扰,目标Topic将变成SCHEDULE_TOPIC_XXXX,同时REAL_TOPIC变成RMQ_SYS_TRANS_HALF_TOPIC,真实的Topic在这个环节已经丢失。
1 事务消息简要流程
Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC两阶段提交的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,用以达到消息的最终一致性的目的,并不是强一致性的分布式事务。
下面是事务消息的流程图:
注意,RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败,通过Rocket+MQ的事务消息可以实现可靠消息最终一致性方案的分布式事务。
假设有两个独立部署的系统A、B,A系统提供下单服务,B系统提供扣款服务,下单之后必须扣款,它们的数据库也是独立部署的,这就是一个典型的分布式事务的场景。下面看看RocketMQ的事务消息如何实现最终一致性的分布式事务。
由于是基于2PC的思想,RocketMQ事务消息同样分为两个阶段:Prepared阶段和确认阶段:
- Prepared阶段,首先Producer发送一个half message(也称为半消息)给RocketMQ。这个半消息区别于普通消息,即使消息被成功发送到了Broker端,也不会立即可见,需要 Producer对消息的二次确认后,Consumer才可能去消费它。
- 随后RocketMQ服务端响应半消息的写入结果,如果是写入成功,那么执行Producer(A系统)的本地事务,如果响应写入失败,此时本地事务逻辑不执行即可。
- 确认阶段,Producer(A系统)的本地事务执行可能成功或者失败,Producer将根据结果返回一个COMMIT或者ROLLBACK状态给RocketMQ,RocketMQ收到这个再确认消息之后,对半消息也执行COMMIT或者ROLLBACK,如果是COMMIT,那么半消息对Consumer(B系统)可见(可以消费),如果是ROLLBACK,那么半消息被“回滚”,Consumer永不可见(永不可消费)。
补偿机制:在第二阶段中,如果RocketMQ迟迟收不到Producer的返回结果,即这条半消息的状态一直是pending,则会从服务端发起一次“回查”调用。Producer收到回查消息,检查回查消息对应的本地事务的状态。根据本地事务状态,重新Commit或者Rollback。
可以知道,补偿阶段用于解决确认阶段Producer的消息Commit或者Rollback发生超时或者失败(比如确认阶段Producer挂了)的情况。
这里还有几个问题或者技术点需要深入的了解一下。
2 一阶段半消息不可见的设计
RocketMQ会判断写入的如果事务消息,则RocketMQ对消息的Topic和Queue等属性进行替换,改变主题为RMQ_SYS_TRANS_HALF_TOPIC
,同时将原来的Topic和Queue
信息存储到消息的属性中。
因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,并且由于消费组未订阅该替换的主题,故而消费者无法感知消息的存在,不会消费。
随后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
其实改变消息主题是RocketMQ的常用“套路”,RocketMQ的延时消息的实现机制也是这个逻辑,非常的巧妙。
3 二阶段Commit和Rollback操作
在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。
对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是为了区别于这条消息没有确定的状态(Pending),需要一个操作来标识这条消息的最终状态。
RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。
引入Op消息后,事务消息无论是Commit还是RollBack都会记录一个Op操作。只不过Commit相对于Rollback只是在写入Op消息前多了一步创建Half消息的索引的过程,即根据Half消息恢复出了以前的普通消息并在内部走了一遍普通的发送的流程,这样消费者就能看到普通消息了。
4 Op消息的设计
RocketMQ将Op消息写入到另一个特定的全局内部Topic中,像Half消息的Topic一样,不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。
5 Commit消息变得可见
在执行二阶段Commit操作时,需要构建出Half消息的索引,让消息变得对生产者可见。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。
所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后在内部走一遍消息写入流程即可使得Half消息对客户端可见。
6 消息回查
如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。
Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查。
RocketMQ的Broker端会开启一个定时任务(1分钟一次),从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,通过对比Half消息和Op消息对未确定状态的事务消息发起回查事务状态的请求。
事务消息将在 Broker 配置文件中的参数 transactionTimeout
(默认为6s?)这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
回查时将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker根据返回的事务状态来决定是提交或回滚消息。
多次回查可能都不会成功,因此RocketMQ并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,比如Producer客户端崩溃的时候,那么rocketmq默认回滚该消息,此时可能出现本地事务成功而消息回滚的局面,那么事务的最终一致性也将无法保证。
7 最终一致性
RocketMQ的事务消息仅仅保证本地事务和MQ消息发送到消息队列形成原子性,它们才是同一个事务,但不保证消费者是否能一定消费成功。
那消费者消费失败怎么办?因为消费端RocketMQ有重试机制,如果不是代码问题一般重试几次就能成功,这里我们要保证消息消费的幂等性,即多次消费同一个消息对系统的状态没有影响,或者说不会影响最终正确的结果。比如上面的案例中,发生了重复消费,可能就会重复调用多次扣款的接口,我们要保证对同一个消息多次调用和一次调用的最终结果是一致的,而不是调用几次接口就扣款几次。
如果消费者一直执行失败,几乎可以断定就是代码有问题所以才引起的异常。如果多次失败并重试达到一定次数之后,可以先将该异常记录下来,通常是记录到数据库中,后续由人工处理,通过这样来让事务达到最终的一致性。
因此RocketMQ的事务消息不是强一致性的,而是保证最终一致性,并且可能需要人工介入。
目前,生产级别采用的各种分布式事务解决方案也几乎都是最终一致性的。试想一下,如果要保证强一致性的,即必须实时的保证数据的一致性,那么一定需要同步阻塞,此时将会阻塞大量的服务,降低消息分布式系统的可用性和并发度,这是更加不可容忍的。实际上也有强一致性的分布式事务方案,比如基于数据库的2PC实现,但是几乎很少使用,或者说,建议小公司谨慎使用分布式事务,能不用就不用。
与最终一致性对应的业务是,通常在客户进行操作之后,不会立即返回客户成功的信号,而是返回一个“业务正在办理中,成功了会通知你”、“钱款两小时内到账”等友好的延时提醒。
相关文章:
如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!
以上是关于RocketMQ的分布式事务机制(事务消息)的主要内容,如果未能解决你的问题,请参考以下文章
分布式事务中使用RocketMQ的事务消息机制优化事务的处理逻辑