RocketMQ:消息ACK机制源码解析

Posted 又蠢又笨的懒羊羊程序猿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ:消息ACK机制源码解析相关的知识,希望对你有一定的参考价值。

消息消费进度

概述

消费者消费消息过程中,为了避免消息的重复消费,应将消息消费进度保存起来,当其他消费者再对消息进行消费时,读取已消费的消息偏移量,对之后的消息进行消费即可。

消息模式分为两种:

  • 集群模式:一条消息只能被一个消费者消费
  • 广播模式:一条消息被所有消费者都消费一次

广播模式下,消息被所有消费者消费,因此消息消费的进度可以跟消费端保存在一起,即本地保存。

集群模式下,消息只能被集群内的一个消费者消费,进度不能保存在消费端,否则会导致消息重复消费,因此集群模式下消息进度集中保存在Broker中。

消息进度存储接口

OffsetStore

public interface OffsetStore 
	//加载消息消费进度
    void load() throws MQClientException;

    //更新消费进度并保存在内存中
    void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);

	//从本地存储中获取消息消费进度
    long readOffset(final MessageQueue mq, final ReadOffsetType type);

	//保存所有消息消费进度-本地/远程
    void persistAll(final Set<MessageQueue> mqs);

    void persist(final MessageQueue mq);

 	//移除偏移量
    void removeOffset(MessageQueue mq);

    //根据Topic克隆一份消息队列消费进度缓存表
    Map<MessageQueue, Long> cloneOffsetTable(String topic);

   //更新消费进度到Broker
    void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException;
   

DefaultMQPushConsumerImpl#start

switch (this.defaultMQPushConsumer.getMessageModel()) 
    case BROADCASTING:
        //广播模式下 将消息消费进度存储到本地
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    case CLUSTERING:
        //集群模式下 将消息消费的进度存储到远端Broker中
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;

this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

如上所示,根据消息消费模式的不同,会创建不同的OffsetStore对象。


广播模式消费进度存储(LocalFileOffsetStore)

public class LocalFileOffsetStore implements OffsetStore 
    //存储目录
    //消费者启动时-可以通过"-D rocketmq.client.localOffsetStoreDir=路径"来指定
    public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
        "rocketmq.client.localOffsetStoreDir",
        System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
    private final static InternalLogger log = ClientLogger.getLog();
    //MQ客户端
    private final MQClientInstance mQClientFactory;
    //消费组名
    private final String groupName;
    //存储路径
    private final String storePath;
    //以MessageQueue为键-消费偏移量为值的缓存表
    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>();

构造函数

public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) 
    this.mQClientFactory = mQClientFactory;
    this.groupName = groupName;
    //json格式存储
    this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
        this.mQClientFactory.getClientId() + File.separator +
        this.groupName + File.separator +
        "offsets.json";

LocalFileOffsetStore#load

public void load() throws MQClientException 
    //从本地磁盘中进行读取json文件-并进行序列化封装转化为map
    OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
    if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) 
        //存入缓存表
        offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());

        for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) 
            AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
            log.info("load consumer's offset,   ",
                this.groupName,
                mq,
                offset.get());
        
    


public class OffsetSerializeWrapper extends RemotingSerializable 
    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>();

LocalFileOffsetStore#persistAll

public void persistAll(Set<MessageQueue> mqs) 
    if (null == mqs || mqs.isEmpty()) 
        return;
    
	
    OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) 
        if (mqs.contains(entry.getKey())) 
            AtomicLong offset = entry.getValue();
            //填充<消息队列-消费偏移量>缓存表
            offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
        
    

    //转为json格式
    String jsonString = offsetSerializeWrapper.toJson(true);
    if (jsonString != null) 
        try 
            //jsonString->file->保存到storePath
            MixAll.string2File(jsonString, this.storePath);
         catch (IOException e) 
            log.error("persistAll consumer offset Exception, " + this.storePath, e);
        
    

persistAll()的入口是MQClientInstance#startScheduledTask

MQClientInstance#startScheduledTask

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 

    @Override
    public void run() 
        try 
            MQClientInstance.this.persistAllConsumerOffset();
         catch (Exception e) 
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        
    
, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

消费端启动后延迟10s开启该定时任务,每隔5s进行一次持久化。

MQClientInstance#persistAllConsumerOffset

Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) 
    Entry<String, MQConsumerInner> entry = it.next();
    MQConsumerInner impl = entry.getValue();
    impl.persistConsumerOffset();

DefaultMQPushConsumerImpl#persistConsumerOffset

try 
    this.makeSureStateOK();
    Set<MessageQueue> mqs = new HashSet<MessageQueue>();
    //获取重负载分配好的消息队列
    Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
    mqs.addAll(allocateMq);
	//当前是LocalFileOffsetStore.persistAll
    this.offsetStore.persistAll(mqs);
 catch (Exception e) 
    log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);

集群模式消费进度存储(RemoteBrokerOffsetStore)

RemoteBrokerOffsetStore

private final static InternalLogger log = ClientLogger.getLog();
//MQ客户端实例-该实例被同一个JVM下的消费者和生产者共用
private final MQClientInstance mQClientFactory;
//消费组名
private final String groupName;
//以消息队列为键-消费偏移量为值的缓存表
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
    new ConcurrentHashMap<MessageQueue, AtomicLong>();

RemoteBrokerOffsetStore#persistAll

for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) 
    MessageQueue mq = entry.getKey();
    AtomicLong offset = entry.getValue();
    if (offset != null) 
        if (mqs.contains(mq)) 
            try 
                //更新消费偏移量到Broker
                this.updateConsumeOffsetToBroker(mq, offset.get());
                log.info("[persistAll] Group:  ClientId:  updateConsumeOffsetToBroker  ",
                    this.groupName,
                    this.mQClientFactory.getClientId(),
                    mq,
                    offset.get());
             catch (Exception e) 
                log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
            
         
    

RemoteBrokerOffsetStore#updateConsumeOffsetToBroker

同步更新消息消费偏移量,如Master关闭,则更新到Slave。

//从MQ客户端中根据BrokerName获取消息队列对应的Broker
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) 
    //根据Topic从NameServer更新路由信息
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
    //重新查找
    findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());


if (findBrokerResult != null) 
    //封装消息消费队列更新请求头
    UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
    requestHeader.setTopic(mq.getTopic());			//主题信息
    requestHeader.setConsumerGroup(this.groupName);	 //消费者组
    requestHeader.setQueueId(mq.getQueueId());		//队列ID
    requestHeader.setCommitOffset(offset);			//消费偏移量

    if (isOneway) 
        //Oneway->根据Broker地址->发送请求将消费偏移量保存到Broker-超时时间默认为5s
        this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
            findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
     else 
        this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
            findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
    
 else 
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);


RemoteBrokerOffsetStore#updateOffset

if (mq != null) 
    //从缓存中获取消息队列对应的偏移量
    AtomicLong offsetOld = this.offsetTable.get(mq);
    //为空
    if (null == offsetOld) 
        //将存入的offset存入内存中
        offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
    
	//不为空->根据increaseOnly更新原先的offsetOld
    if (null != offsetOld) 
        if (increaseOnly) 
            MixAll.compareAndIncreaseOnly(offsetOld, offset);
         else 
            offsetOld.set(offset);
        
    


RemoteBrokerOffsetStore#readOffset

public long readOffset(final MessageQueue mq,	//消息队列
                       final ReadOffsetType type) 	//读取偏移量类型
    if (mq != null) 
        switch (type) 
            case MEMORY_FIRST_THEN_STORE:
            //从内存中读取    
            case READ_FROM_MEMORY: 
                AtomicLong offset = this.offsetTable.get(mq);
                if (offset != null) 
                    return offset.get();
                 else if (ReadOffsetType.READ_FROM_MEMORY == type) 
                    return -1;
                
            
            //从Broker中读取    
            case READ_FROM_STORE: 
                try 
                    //从Broker中获取消费偏移量
                    long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
                    AtomicLong offset = new AtomicLong(brokerOffset);
                    //更新至内存中(map)
                    this.updateOffset(mq, offset.get(), false);
                    return brokerOffset;
                
                // No offset in broker
                catch (MQBrokerException e) 
                    return -1;
                
                //Other exceptions
                catch (Exception e) 
                    log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
                    return -2;
                
            
            default:
                break;
        
    

RemoteBrokerOffsetStore#fetchConsumeOffsetFromBroker

//从MQ客户端实例中获取Boker信息
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) 
	//从NameServer中更新Topic的路由信息
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
    //重新获取Broker
    findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());


if (findBrokerResult != null) 
    //封装查询消费进度请求头
    QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
    requestHeader.setTopic(mq.getTopic());
    requestHeader.setConsumerGroup(this.groupName);
    requestHeader.setQueueId(mq.getQueueId());
	//带上请求头调用MQClientAPI到Broker中获取
    return this.mQClientFactory.getMQClientAPIImpl()以上是关于RocketMQ:消息ACK机制源码解析的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ:消息存储机制详解与源码解析

RocketMQ源码(12)—Broker 的消息刷盘源码深度解析一万字

深度解析RocketMQ Topic的创建机制

RocketMQ之offset确认机制

RocketMQ源码解析-消息消费

RocketMQ事务消息机制