RocketMQ实现事务消息方案
Posted liudalei
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ实现事务消息方案相关的知识,希望对你有一定的参考价值。
RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持。
RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ 本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。
在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ内部,解决 Producer 端的消息发送与本地事务执行的原子性问题。
执行流程如下:
为方便理解我们还以注册送积分的例子来描述 整个流程。
Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。
1、Producer 发送事务消息
Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
本例中,Producer 发送 ”增加积分消息“ 到MQ Server。
2、MQ Server回应消息发送成功
MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。
3、Producer 执行本地事务
Producer 端执行业务代码逻辑,通过本地数据库事务控制。
本例中,Producer 执行添加用户操作。
4、消息投递
若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积分消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;
若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除”增加积分消息“ 。
MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。
5、事务回查
如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。
以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。
RoacketMQ提供RocketMQLocalTransactionListener接口:
public interface RocketMQLocalTransactionListener { /** - 发送prepare消息成功此方法被回调,该方法用于执行本地事务 - @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id - @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到 - @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 */ RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg); /** - @param msg 通过获取transactionId来判断这条消息的本地事务执行状态 - @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 */ RocketMQLocalTransactionState checkLocalTransaction(Message msg); }
发送事务消息:
以下是RocketMQ提供用于发送事务消息的API:
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); //设置TransactionListener实现 producer.setTransactionListener(transactionListener); //发送事务消息 SendResult sendResult = producer.sendMessageInTransaction(msg, null);
下面给出代码示例
package com.example.rocketmq.common; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Slf4j @Service public class SyncProducer { @Resource private RocketMQTemplate rocketMQTemplate; public TransactionSendResult sendSyncMessage(String msg, String topic, String tag){ log.info("【发送消息】:{}", msg); //构建消息体 JSONObject jsonObject = new JSONObject(); jsonObject.put("message",msg); Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx_group", topic, message, tag); log.info("【发送状态】:{}", result.getLocalTransactionState()); return result; } }
package com.example.rocketmq.common; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component @RocketMQTransactionListener(txProducerGroup = "tx_group") public class SyncProducerListener implements RocketMQLocalTransactionListener { private ConcurrentHashMap<String, Object> localTrans = new ConcurrentHashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { log.info("【本地业务执行完毕】 msg:{}, Object:{}", message, o); localTrans.put(message.getHeaders().getId()+"", message.getPayload()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); log.error("【执行本地业务异常】 exception message:{}", e.getMessage()); return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { log.info("【执行检查任务】"); return RocketMQLocalTransactionState.UNKNOWN; } }
package com.example.rocketmq.common; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "TransactionTopic",consumerGroup = "spring_boot_consumer_group") @Slf4j public class TxmsgConsumer implements RocketMQListener<String> { @Override public void onMessage(String s) { log.info("开始消费消息:{}",s); } }
application.propertis
rocketmq.name-server=localhost:9876
rocketmq.producer.group = spring_boot_producer_group
pom.xml
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
测试:
package com.example.rocketmq.controller; import com.example.rocketmq.common.SyncProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired private SyncProducer syncProducer; @GetMapping("test") String test() { syncProducer.sendSyncMessage("我随便传的一条测试消息,内容保密","TransactionTopic","TransactionTAG"); return "ok"; } }
结果:
以上是关于RocketMQ实现事务消息方案的主要内容,如果未能解决你的问题,请参考以下文章
27 发送消息零丢失方案:RocketMQ事务消息的实现流程分析