RocketMQ(09)——发送事务消息

Posted elim168

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ(09)——发送事务消息相关的知识,希望对你有一定的参考价值。

发送事务消息

RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指定一个TransactionListener。TransactionListener接口的定义如下。

public interface TransactionListener 
    /**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);

如你所见,TransactionListener定义了两个接口方法,当消息发送到了Broker后,会回调其executeLocalTransaction(),根据返回状态来决定是要提交事务还是回滚事务。LocalTransactionState有三种状态,COMMIT_MESSAGE、ROLLBACK_MESSAGE和UNKNOW。前两种状态很明显,表示提交和回滚。当在executeLocalTransaction()中还不能确定消息是要提交还是回滚时,即可以返回UNKNOW,表示不清楚。这样Broker会发送check消息过来检测事务的状态,到了生产者这里的表现就是回调TransactionListener的checkLocalTransaction()方法。下面是一个使用TransactionMQProducer发送事务消息的示例。

@Test
public void transactionalSend() throws Exception 
  TransactionMQProducer producer = new TransactionMQProducer("group1");
  producer.setNamesrvAddr(this.nameServer);
  producer.setTransactionListener(new TransactionListener() 
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) 
      Integer attach = (Integer) arg;
      if (attach % 6 == 0) 
        return LocalTransactionState.COMMIT_MESSAGE;
       else if (attach % 4 == 0) 
        return LocalTransactionState.ROLLBACK_MESSAGE;
      
      return LocalTransactionState.UNKNOW;
    

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) 
      int i = Integer.parseInt(new String(msg.getBody()));
      System.out.println("checkLocalTransaction ----" + i);
      if (i % 10 == 0) 
        return LocalTransactionState.COMMIT_MESSAGE;
       else if (i % 7 == 0) 
        return LocalTransactionState.UNKNOW;
      
      return LocalTransactionState.ROLLBACK_MESSAGE;
    
  );
  producer.start();
  for (int i=0; i<100; i++) 
    Message message = new Message("topic1", "transactional", String.valueOf(i).getBytes());
    producer.sendMessageInTransaction(message, i);
  
  TimeUnit.SECONDS.sleep(60);
  producer.shutdown();

如上,在调用TransactionMQProducer的sendMessageInTransaction()时还可以传递一个业务参数,该参数将作为TransactionListener的executeLocalTransaction()的第二个参数。按照上面的代码只有发送消息的顺序(从0开始算)是6的倍数或者是非4的倍数但是10的倍数的消息才会提交事务,对应的消息才能被消费者进行消费。

(注:本文是基于RocketMQ4.5.0所写)

以上是关于RocketMQ(09)——发送事务消息的主要内容,如果未能解决你的问题,请参考以下文章

rocketmq源码分析:事务消息延时消息消息查询

RocketMQ事务消息机制

RocketMQ 整合SpringBoot发送事务消息

RocketMQ事务消息

RocketMQ事务消息

RocketMQ(02)——发送消息的三种方式