11 SpringBoot整合RocketMQ实现事务消息

Posted java1234_小锋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了11 SpringBoot整合RocketMQ实现事务消息相关的知识,希望对你有一定的参考价值。

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。

RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:

Half(Prepare) Message——半消息(预处理消息)

半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。

Message Status Check——消息状态回查

由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。

执行流程:

  1. 应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ。
  2. prepare消息发送成功后,应用模块执行数据库事务(本地事务)。
  3. 根据数据库事务执行的结果,再返回Commit或Rollback给MQ。
  4. 如果是Commit,MQ把消息下发给Consumer端,如果是Rollback,直接删掉prepare消息。
  5. 第3步的执行结果如果没响应,或是超时的,启动定时任务回查事务状态(最多重试15次,超过了默认丢弃此消息),处理结果同第4步。
  6. MQ消费的成功机制由MQ自己保证。

具体实例:

通过rocketMQTemplatesendMessageInTransaction方法发送事务消息

/**
 * 发送事务消息
 */
public void sendTransactionMessage(){
	// 构造消息
	Message msg = MessageBuilder.withPayload("rocketmq事务消息-01").build();
	rocketMQTemplate.sendMessageInTransaction("java1234-transaction-rocketmq",msg,null);
}

定义本地事务处理类,实现RocketMQLocalTransactionListener接口,以及加上@RocketMQTransactionListener注解,这个类似方法的调用是异步的;

executeLocalTransaction方法,当我们处理完业务后,可以根据业务处理情况,返回事务执行状态,有bollback, commit or unknown三种,分别是回滚事务,提交事务和未知;根据事务消息执行流程,如果返回bollback,则直接丢弃消息;如果是返回commit,则消费消息;如果是unknow,则继续等待,然后调用checkLocalTransaction方法,最多重试15次,超过了默认丢弃此消息;

checkLocalTransaction方法,是当MQ Server未得到MQ发送方应答,或者超时的情况,或者应答是unknown的情况,调用此方法进行检查确认,返回值和上面的方法一样;

@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
	@Override
	public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
		// ... local transaction process, return bollback, commit or unknown
		System.out.println("executeLocalTransaction");
		return RocketMQLocalTransactionState.UNKNOWN;
	}

	@Override
	public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
		// ... check transaction status and return bollback, commit or unknown
		System.out.println("checkLocalTransaction");
		return RocketMQLocalTransactionState.COMMIT;
	}
}

运行:

生产者端两个方法都执行到了,

消费端也获取到了消息;

执行如下:

生产者端发送half消息到MQ-SERVER,然后异步执行executeLocalTransaction方法,返回unknown,MQ-SERVER接收到unknown后,继续等待,然后再执行checkLocalTransaction确认,返回commit,MQ-SERVER得到commit后,消费端才可以消费消息;

说明:这个是锋哥的RocketMQ备课笔记,等备课完,会发布配套的视频教程,如有需要,可以先加锋哥WX:java1239 欢迎白嫖
没问题!

微信搜一搜公众号【java1234】关注这个放荡不羁的程序员,关注后回复【资料】有我准备的一线大厂笔试面试资料以及简历模板。

以上是关于11 SpringBoot整合RocketMQ实现事务消息的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot(17)---SpringBoot整合RocketMQ

4 SpringBoot整合RocketMQ实现消息发送和接收

SpringBoot整合RocketMQ

Springboot 整合 rocketmq及调度方案实现

9 SpringBoot整合RocketMQ实现顺序消息

10 SpringBoot整合RocketMQ实现延迟消息