RocketMQ 事务消息 原理及使用方法解析

Posted 小王曾是少年

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 事务消息 原理及使用方法解析相关的知识,希望对你有一定的参考价值。

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年3月24日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

RocketMQ事务消息

RocketMQ针对事务消息扩展了两个相关的概念:

1. 半消息

半消息(Half Message)是一种特殊的消息类型,处于这个状态的消息暂时不能被Consumer消费。

当一条事务消息被成功投递到Broker上,但Broker没有收到Producer的二次确认时,该事务消息就处于暂时不可消费的状态,这种消息就是半消息


2. 消息状态回查

由于网络抖动、系统宕机等等原因,可能导致ProducerBroker发送的二次确认信息没有送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态。

这个机制主要是用来解决分布式事务中的超时问题。


上图是RocketMQ官网提供的事务消息流程图,执行步骤如下:

  1. ProducerBroker端发送半消息
  2. Broker发送ACK确认,表示半消息发送成功
  3. Producer执行本地事务
  4. 本地事务完毕,根据事务的状态,ProducerBroker发送二次确认消息,确认该半消息的CommitRollback状态。Broker收到二次确认消息之后:如果是Commit状态,则直接将消息发送到Consumer端执行消费逻辑;如果是Rollback状态,则会直接将其标记为失败,不会发送给Consumer
  5. 针对超时情况,Broker主动向Producer发起消息回查
  6. Producer处理回查消息,返回对应的本地事务执行结果
  7. Broker针对消息回查的结果,执行【步骤4】的操作

适用场景举例

以转账系统为例,假设A要向B转账100元,执行本地事务和发送异步消息的过程应该同时保持成功或失败,即A的账户扣款成功后,就一定要发消息发送出去,最直观的思路可能有两个:

1. 先发消息

这种策略的流程如下:

存在的问题是: 如果消息发送成功,但后续A扣款失败了,消费端仍然会消费这条消息,进而向B账户里打钱,数据就出现不一致的情况了。


2. 后发消息


存在的问题是: 如果扣款成功,但是发送消息失败,就会出现A已经扣钱了,但B账户里没有入账的情况,同样也是无法接受的。

出现上述情况的根本原因是本地事务和发送消息这两个操作并不是原子的,因此也就无法做到同时失败或同时成功,所以数据一致性难以保障。


解决上述问题的方法就是上面提到的半消息

如上图所示,执行本地事务之前先发送一个半消息,此时还不能被消费者消费,只有当本地事务执行完毕并发送二次确认消息之后,半消息才能被Consumer消费。

如此以来就保证了多个系统数据的数据一致性,前提是系统不需要保证数据的强一致性


使用示例

发送事务消息

RocketMQ发送事务消息设计到消息发送、消息回查、消息二次确认等过程,因此这个过程可能会“稍显复杂”

发送事务消息使用的是TransactionMQProducer,一个简单的demo如下:

public class TransactionProducer 
    public static void main(String[] args) throws MQClientException 
        TransactionCheckListener transactionCheckListener = new TransactionCheckListener() 
            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) 
                return null;
            
        ;

        TransactionMQProducer producer = new TransactionMQProducer("GROUP A");

        producer.setCheckThreadPoolMinSize(2);
        producer.setCheckThreadPoolMaxSize(2);
        producer.setCheckRequestHoldMax(2000);
        producer.setTransactionCheckListener(transactionCheckListener);
        producer.start();

        String[] tags = new String[]"TAG A", "TAG B", "TAG C";
        LocalTransactionExecuter transactionExecuter = new LocalTransactionExecuter() 
            @Override
            public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) 
                return null;
            
        ;
        for (int i = 0; i < 10; i++) 
            Message msg = new Message("TEST", tags[i % tags.length], "KEY " + i, ("HELLO, ROCKETMQ" + i).getBytes());
            SendResult result = producer.sendMessageInTransaction(msg, transactionExecuter, null);
            try 
                Thread.sleep(100);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
        producer.shutdown();
    


事务回查

checkLocalTransaction是事务消息回查监听方法,可以获取本地事务状态,根据事务的状态来确定是否要发送二次确认消息,或者进行事务回滚操作。

消息回查事务的状态由以下几种情况:

  1. LocalTransactionState.ROLLBACK_MESSAGE:事务回滚
  2. LocalTransactionState.COMMIT_MESSAGE:事务提交
  3. LocalTransactionState.UNKNOW:未知状态,此时Broker会定时重新查询Producer消息的状态,直到出现前面两种情况。
public interface TransactionListener 
    LocalTransactionState checkLocalTransaction(final MessageExt msg);


事务执行

executeLocalTransaction方法用于执行本地事务,如果本地事务执行成功则进行事务提交,否则进行事务回滚,如果是UNKNOW状态的话,Broker就会定时回查Producer的消息状态,直到彻底成功或失败。

public interface TransactionListener 
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);


RocketMQ原理及解析

一.什么是消息队列

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

官网地址:https://rocketmq.apache.org/

Producer
消息生产者,位于用户的进程内,Producer通过NameServer获取所有Broker的路由信息,根据负载均衡策略选择将消息发到哪个Broker,然后调用Broker接口提交消息。

Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组,类似于同一个服务的多个节点。

Consumer
消息消费者,位于用户进程内。Consumer通过NameServer获取所有broker的路由信息后,向Broker发送Pull请求来获取消息数据。Consumer可以以两种模式启动,广播(Broadcast)和集群(Cluster),广播模式下,一条消息会发送给所有Consumer,集群模式下消息只会发送给一个Consumer。

Consumer Group
消费者组,和生产者类似,消费同一类消息的多个 Consumer 实例组成一个消费者组。

Broker
Broker是RocketMQ的核心模块,负责接收并存储消息,同时提供Push/Pull接口来将消息发送给Consumer。Consumer可选择从Master或者Slave读取数据。多个主/从组成Broker集群,集群内的Master节点之间不做数据交互。

NameServer可以看作是RocketMQ的注册中心,它管理两部分数据:集群的Topic-Queue的路由配置;Broker的实时配置信息。其它模块通过Nameserv提供的接口获取最新的Topic配置和路由信息。

Topic
主题,表示一类消息的集合,Topic是消息队列订阅的基本单位

Message
代表一条消息,系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个Topic,拥有唯一的MessageID,且可以携带具有业务标识的Key。

Queue
消息队列,组成Topic的最小单元,Topic是逻辑概念,Queue是物理存储,Consumer在消费Topic消息时底层实际是拉取Queue的消息

Tag
标签可以被认为是对 Topic 进一步细化。主要用来区分和过滤消息,一般在相同业务模块中通过引入标签来标记不同用途的消息。

Offset
RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件,每个Queue都对应一个Offset记录当前Queue中消息条数。

二.为什么使用消息队列
1.应用接耦

微服务系统中,如果藕合调用支付服务,库存服务合物流服务,如果任何一个子服务出现问题都会造成下单操作失败,从而影响用户使用体验,当中间有消息队列做缓冲后,系统可用性就高多了,当物流系统发生故障恢复后,物流系统从消息队列中拉取消息,用户感知不到系统异常,同时服务间异步调用提升系统响应时间。
2.流量削峰
瞬间的请求如果直接打到数据库,可能会造成系统崩溃,引入消息队列后,服务根据自身处理能力消费消息。
3.消息分发
当基础服务基础数据发生修改时,其他使用数据的微服务可以订阅基础服务消息,同步更新修改。

4.分布式事务一致性
使用事务消息,既可以实现系统之间解耦,又能保证数据的最终一致性。

5.顺序收发
当业务场景需要异步按顺序调用其他微服务时,可以考虑使用异步消息

三.怎么使用消息队列
消息发送的三种方式:

同步:发送网络请求后同步等待Broker服务器的返回结果,支持发送失败后重试,适用于较重要的消息通知场景;

异步:异步发送网络请求,不会阻塞当前线程,不支持失败重试,适用于相应时间较高的场景;

单向:同异步,但不支持回调,适用于相应时间极短,可靠性不高的场景,如日志收集,用户操作记录等。

消息队列正确订阅关系

1.顺序消息

消息发送时保持顺序
消息被存储时保持和发送的顺序一致
消息被消费时保持和存储的顺序一致

Producer的投递策略,主要有以下三种投递策略


在消息发送时,需指定对应的MessageQueueSelector,此时我们只需通过业务ID与queue进行关联, send中的参数arg即为select中的arg,将订单号作为参数传入,同一业务ID的相关消息则可以保证在同一queue中。同时在消费者监听时设置监听模式为MessageListenerOrderly。

2.延时消息

RocketMQ目前指定的延时时间间隔有1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h,用等级来表示时间间隔。

应用场景可定时关单。

设置延时等级,大于0即为延时消息,大于最大等级则将延时等级修改为最大等级;
Broker默认会有一个延迟消息专属的Topic,下面有18个队列,每个延迟级别对应一个队列。如果Broker接收到的是延迟消息,会改写消息的Topic和queueId,将消息暂时统一写入延迟队列中,然后由ScheduleMessageService线程对延迟队进行扫描,将到期需要交付的消息从CommitLog中读出来,然后恢复消息原本的Topic和queueId等属性,重新写回CommitLog,然后Consumer就可以正常消费了。

3.事务消息

四.其他相关问题
1.主从复制:

同步复制(SYNC_MASTER):Master服务器和Slave都写成功后才返回给客户端写成功状态,优点是Master宕机后,Slave有全部的数据备份,消费者可以继续从Slave中消费,缺点是写入延迟增加,降低系统吞吐量;

异步复制(ASYNC_MASTER):Master写入Broker后立即返回客户端状态,优点延迟低吞吐量高,Master宕机后缺点未同步的数据可能丢失。

2.读写分离:

客户端只能写入数据到Master,消费者消费消息时根据消息的堆积情况选择从Master或Slave中拉取,当Master服务器的消息堆积超过物理内存的40%时,则会从Slave中拉取,这个比例可以在配置文件中设置。

3.为什么放弃ZooKeeper选择NameServer
RocketMQ早期使用了ZooKeeper做集群管理,后来放弃了转而使用自研的NameServer。
RocketMQ部署多主多从时,Broker 的Master和Slave不会部署在同一台机器上,谁主谁从是预先在配置文件中就设置好了的,无需Zookeeper的选举机制,NameServer的设计实现复杂度低,使得网络通信简单,性能得到大大的提升。

4.如何避免消息发送时丢失
消息发送方式
消息队列部署方式
刷盘方式

5.如何避免消费时被重复消费
消费者消费成功后未提交相应的offset到broker

消费者消费失败,service层未使用事务注解。

6.生产环境下 RocketMQ 为什么不能开启自动创建主题

自动创建的Topic可能只会分布在单一的broker的queue中,对于集群部署的broker来说,分布就不均一

7.各个消息队列产品比较

以上是关于RocketMQ 事务消息 原理及使用方法解析的主要内容,如果未能解决你的问题,请参考以下文章

❧消息队列解析 RocketMQ 业务消息——“事务消息”

RocketMQ事务机制的底层实现原理解析

28 RocketMQ:事务消息机制的底层实现原理

RocketMQ原理及解析

RocketMQ事务消息实现原理

RocketMQ——Transaction Message(事务消息)