RocketMQ(09)——发送事务消息
Posted elim168
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ(09)——发送事务消息相关的知识,希望对你有一定的参考价值。
发送事务消息
RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer
,它还需要指定一个TransactionListener
。TransactionListener接口的定义如下。
public interface TransactionListener
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
如你所见,TransactionListener定义了两个接口方法,当消息发送到了Broker后,会回调其executeLocalTransaction()
,根据返回状态来决定是要提交事务还是回滚事务。LocalTransactionState有三种状态,COMMIT_MESSAGE、ROLLBACK_MESSAGE和UNKNOW。前两种状态很明显,表示提交和回滚。当在executeLocalTransaction()
中还不能确定消息是要提交还是回滚时,即可以返回UNKNOW,表示不清楚。这样Broker会发送check消息过来检测事务的状态,到了生产者这里的表现就是回调TransactionListener的checkLocalTransaction()
方法。下面是一个使用TransactionMQProducer发送事务消息的示例。
@Test
public void transactionalSend() throws Exception
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr(this.nameServer);
producer.setTransactionListener(new TransactionListener()
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg)
Integer attach = (Integer) arg;
if (attach % 6 == 0)
return LocalTransactionState.COMMIT_MESSAGE;
else if (attach % 4 == 0)
return LocalTransactionState.ROLLBACK_MESSAGE;
return LocalTransactionState.UNKNOW;
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg)
int i = Integer.parseInt(new String(msg.getBody()));
System.out.println("checkLocalTransaction ----" + i);
if (i % 10 == 0)
return LocalTransactionState.COMMIT_MESSAGE;
else if (i % 7 == 0)
return LocalTransactionState.UNKNOW;
return LocalTransactionState.ROLLBACK_MESSAGE;
);
producer.start();
for (int i=0; i<100; i++)
Message message = new Message("topic1", "transactional", String.valueOf(i).getBytes());
producer.sendMessageInTransaction(message, i);
TimeUnit.SECONDS.sleep(60);
producer.shutdown();
如上,在调用TransactionMQProducer的sendMessageInTransaction()
时还可以传递一个业务参数,该参数将作为TransactionListener的executeLocalTransaction()
的第二个参数。按照上面的代码只有发送消息的顺序(从0开始算)是6的倍数或者是非4的倍数但是10的倍数的消息才会提交事务,对应的消息才能被消费者进行消费。
(注:本文是基于RocketMQ4.5.0所写)
以上是关于RocketMQ(09)——发送事务消息的主要内容,如果未能解决你的问题,请参考以下文章