RocketMQ使用事务消息

Posted 乐观男孩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ使用事务消息相关的知识,希望对你有一定的参考价值。

目录

说明

事务消息:
1、不支持延时消息和批量消息
2、如果消息没有及时提交,默认check 15次,可以通过Broker的transactionCheckMax参数配置次数。
如果超时15次依然没有得到明确结果,将会打印异常信息,具体的处理策略可以通过复写AbstractTransactionCheckListener类实现
3、每次check的时间间隔可以通过Broker的transactionTimeout配置,也可以在消息中增加CHECK_IMMUNITY_TIME_IN_SECONDS属性指定
4、事务状态:LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。

原理

事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:
第一阶段:发送半事务消息,消息发送后,消息是对消费者透明的,也就是该消息还不属于可消费消息,消费者无法消费。
第二阶段:执行本地事务,本地执行事务后提交消息。
(1)、如果事务执行失败,则回滚消息;
(2)、如果事务执行成功,则提交消息,提交后消费者可消费到消息;
(3)、如果事务执行成功,但消息提交失败,RocketMQ还提供了回查机制:如果一段时间过后,没有提交/回滚半事务消息,RocketMQ会定时回查一定的次数,获取本地事务的状态以决定是提交还是回滚消息。如果回查一定的次数后依然没有获取到本地事务的明确状态,则消息会被放到死信队列,由人工确认如何处理。

事务消息处理流程


1、生产端发送半事务消息到服务端
2、服务端返回半事务消息发送成功响应。注意,此时的消息对消费端是不可见的,不可被消费
3、发送方执行本地事务
4、执行完本地事务后,客户端同步服务端提交/回滚消息
5、如果服务端在一定的时间内,等不到4的回应,则定时进行回查,询问客户端的本地事务状态。
6、客户端检查本地事务状态
7、根据本地事务执行情况,告知服务端,服务端决定是提交消息还是丢弃消息。

生产端

@Test
    public void sendMessage() throws Exception 
        //事务生产者
        TransactionMQProducer producer = new TransactionMQProducer("defaultGroup");
        producer.setNamesrvAddr(SpringUtil.getBean(RocketMqConfig.class).getNamesrvAddr());
        //设置检查本地事务状态的线程池
        //producer.setExecutorService(null);
        //本地事务执行监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        producer.start();
        Message message = new Message(RocketMqUtil.TOPIC, "transaction", "transaction-message".getBytes(Charset.forName("UTF-8")));
        //发送事务消息
        producer.sendMessageInTransaction(message, null);
    

    class TransactionListenerImpl implements TransactionListener 

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) 
            //执行本地事务(数据库)操作......
            int num = new Random().nextInt(10);
            if (num < 3) 
                //本地事务执行成功,提交消息
                return LocalTransactionState.COMMIT_MESSAGE;
             else if (num < 6) 
                //本地事务执行失败,删除消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            
            //等待本地事务check,即执行checkLocalTransaction()方法
            return LocalTransactionState.UNKNOW;
        

        /**
         * 回查逻辑
         * @param msg
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) 
            int num = new Random().nextInt(10);
            if (num < 3) 
                //提交消息
                return LocalTransactionState.COMMIT_MESSAGE;
             else if (num < 6) 
                //删除消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            
            return LocalTransactionState.UNKNOW;
        
    

发送事务消息步骤:
1、初始化TransactionMQProducer实例
2、指定check线程池(回查线程池)
3、为Producer添加自定义事务监听器。自定义事务监听器需实现TransactionListener接口,通过覆盖接口的executeLocalTransaction方法执行本地事务,返回事务状态,客户端会根据本地事务状态通知服务端,决定是否提交消息;通过覆盖接口的checkLocalTransaction方法提供回查机制,当在一定的时间内服务端获取不到本地事务执行状态,将通过该方法回查事务状态,以决定消失是否需要提交。
4、通过Producer.sendMessageInTransaction发送事务消息。
消费者正常消费逻辑

消费端

@Test
    public void consumeMessage() throws Exception 
        DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
        defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                            ConsumeConcurrentlyContext consumeConcurrentlyContext) 
                log.info("消费到消息条数:", list.size());
                list.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8")))
                        .map(String::new).forEach(System.out::println);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        defaultMQPushConsumer.start();
        Thread.sleep(5000L);
    

消费端正常消费消息即可。

以上是关于RocketMQ使用事务消息的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ的死信队列

RocketMQ的死信队列

RocketMQ - 如何用死信队列解决消费者异常

消息队列 - 死信、延迟、重试队列

RocketMQ源码(23)—DefaultMQPushConsumer消费者重试消息和死信消息源码

RocketMQ 死信队列 | 消费者出现异常如何处理?