RocketMQ使用事务消息
Posted 乐观男孩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ使用事务消息相关的知识,希望对你有一定的参考价值。
说明
事务消息:
1、不支持延时消息和批量消息
2、如果消息没有及时提交,默认check 15次,可以通过Broker的transactionCheckMax参数配置次数。
如果超时15次依然没有得到明确结果,将会打印异常信息,具体的处理策略可以通过复写AbstractTransactionCheckListener类实现
3、每次check的时间间隔可以通过Broker的transactionTimeout配置,也可以在消息中增加CHECK_IMMUNITY_TIME_IN_SECONDS属性指定
4、事务状态:LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。
原理
事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:
第一阶段:发送半事务消息,消息发送后,消息是对消费者透明的,也就是该消息还不属于可消费消息,消费者无法消费。
第二阶段:执行本地事务,本地执行事务后提交消息。
(1)、如果事务执行失败,则回滚消息;
(2)、如果事务执行成功,则提交消息,提交后消费者可消费到消息;
(3)、如果事务执行成功,但消息提交失败,RocketMQ还提供了回查机制:如果一段时间过后,没有提交/回滚半事务消息,RocketMQ会定时回查一定的次数,获取本地事务的状态以决定是提交还是回滚消息。如果回查一定的次数后依然没有获取到本地事务的明确状态,则消息会被放到死信队列,由人工确认如何处理。
事务消息处理流程
1、生产端发送半事务消息到服务端
2、服务端返回半事务消息发送成功响应。注意,此时的消息对消费端是不可见的,不可被消费
3、发送方执行本地事务
4、执行完本地事务后,客户端同步服务端提交/回滚消息
5、如果服务端在一定的时间内,等不到4的回应,则定时进行回查,询问客户端的本地事务状态。
6、客户端检查本地事务状态
7、根据本地事务执行情况,告知服务端,服务端决定是提交消息还是丢弃消息。
生产端
@Test
public void sendMessage() throws Exception
//事务生产者
TransactionMQProducer producer = new TransactionMQProducer("defaultGroup");
producer.setNamesrvAddr(SpringUtil.getBean(RocketMqConfig.class).getNamesrvAddr());
//设置检查本地事务状态的线程池
//producer.setExecutorService(null);
//本地事务执行监听器
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
producer.start();
Message message = new Message(RocketMqUtil.TOPIC, "transaction", "transaction-message".getBytes(Charset.forName("UTF-8")));
//发送事务消息
producer.sendMessageInTransaction(message, null);
class TransactionListenerImpl implements TransactionListener
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg)
//执行本地事务(数据库)操作......
int num = new Random().nextInt(10);
if (num < 3)
//本地事务执行成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
else if (num < 6)
//本地事务执行失败,删除消息
return LocalTransactionState.ROLLBACK_MESSAGE;
//等待本地事务check,即执行checkLocalTransaction()方法
return LocalTransactionState.UNKNOW;
/**
* 回查逻辑
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg)
int num = new Random().nextInt(10);
if (num < 3)
//提交消息
return LocalTransactionState.COMMIT_MESSAGE;
else if (num < 6)
//删除消息
return LocalTransactionState.ROLLBACK_MESSAGE;
return LocalTransactionState.UNKNOW;
发送事务消息步骤:
1、初始化TransactionMQProducer实例
2、指定check线程池(回查线程池)
3、为Producer添加自定义事务监听器。自定义事务监听器需实现TransactionListener接口,通过覆盖接口的executeLocalTransaction方法执行本地事务,返回事务状态,客户端会根据本地事务状态通知服务端,决定是否提交消息;通过覆盖接口的checkLocalTransaction方法提供回查机制,当在一定的时间内服务端获取不到本地事务执行状态,将通过该方法回查事务状态,以决定消失是否需要提交。
4、通过Producer.sendMessageInTransaction发送事务消息。
消费者正常消费逻辑
消费端
@Test
public void consumeMessage() throws Exception
DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext)
log.info("消费到消息条数:", list.size());
list.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8")))
.map(String::new).forEach(System.out::println);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
defaultMQPushConsumer.start();
Thread.sleep(5000L);
消费端正常消费消息即可。
以上是关于RocketMQ使用事务消息的主要内容,如果未能解决你的问题,请参考以下文章