RocketMQ消息队列——消息存储详解

Posted 从善若水

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ消息队列——消息存储详解相关的知识,希望对你有一定的参考价值。

2016年双11前后阿里巴巴将 RocketMQ 捐赠给Apache基金会,很快就吸引了全球众多开源爱好者加入其社区生态中,并在2017年9月成为Apache基金会的顶级项目。
利用RocketMQ可以轻松实现应用解耦、流量消峰、消息分发等功能,并且消息队列还有保证最终一致性、方便动态扩容等功能。RocketMQ不会像ActiveMQ IO一样,随着使用的队列和虚拟topics的增加而影响性能。相比于Kafka ,RocketMQ具有更低的时延和更高的可靠性。所以RocketMQ已经成为了大型互联网服务架构里标配的中间件。
今天我们一起来了解一下RocketMQ消息队列的核心机制。

文章目录

RocketMQ消息队列——消息存储详解

一、RocketMQ各部分角色介绍

1.1 四个角色

      RocketMQ 有四个角色分别是Producer、Customer、Broker和NameServer。启动 RocketMQ 的顺序是先启动 NameServer,再启动Broker,这时候消息队列已经可以提供服务了,想发送消息就使用Producer来发送,想接收消息就使用Customer来接收。很多应用程序既需要发送消息又需要接收消息,可以启动多个Producer和Customer来发送多种消息,同时接收多种消息。

  • Producer 集群:生产者支持分布式部署。分布式生产者通过多种负载均衡模式向代理集群发送消息。发送进程支持快速失败和低延迟;
  • Consumer 集群:消费者支持以Push和Pull两种模型的分布式部署。它还支持集群使用和消息广播。它提供了实时消息订阅机制,能够满足大多数用户的需求;
  • NameServer 集群:
    • NameServer提供轻量级的服务发现和路由。每个NameServer记录完整的路由信息,并提供相应的读写服务,支持快速的存储扩展;
    • Broker管理,NameServer接受来自Broker集群的注册,并提供心跳机制来检查Broker是否活着;
    • 路由管理,每个NameServer将保存关于Broker集群的整个路由信息和供客户端查询的队列信息。
  • Broker 集群:
    • Broker服务器负责消息存储和传递、消息查询、HA保证等;
    • Broker通过提供轻量级的TOPIC和QUEUE机制来处理消息存储。 它们支持Push和Pull模型,包含容错机制(2个副本或3个副本),并提供强大的峰值填充和按原始时间顺序积累数千亿消息的能力。 此外,broker提供灾难恢复、丰富的度量统计和警报机制,这些都是传统消息传递系统所缺乏的。

1.2 两个关键词

      介绍了四个角色之后,还需要介绍一下 Topic 和 Message Queue 这两个名词。一个分布式消息队列中间件部署好以后,可以给很多个业务提供服务,同一个业务也有不同类型的消息要投递,这些不同类型的消息以不同的 Topic 名称来区分。所以发送和接收消息前,先创建 Topic,针对某个 Topic 发送和接收消息。有了 Topic 以后,如果一个 Topic 要发送和接收的数据量非常大,需要能支持增加并行处理的机器来提高处理速度,这时候一个 Topic 可以根据需求设置一个或多个 Message Queue。Topic 有了多个 Message Queue 之后,消息可以并行的向各个 Message Queue 发送,消费者也可以并行的从多个 Message Queue 读取消息并消费。

从上面的介绍大家应该能看出,Broker 是 RockerMQ的核心,大部分“重量级” 工作都是在Broker完成的,包括接收Producer发过来的消息、处理Customer的消费消息请求、消息的持久化存储、消息的 HA 机制以及服务端过滤功能等。
下面我们着重介绍角色Broker的消息存储功能。


二、Broker 消息存储

      分布式队列因为有高可靠性的要求,所以数据要通过磁盘进行持久化存储。用磁盘存储信息会不会很慢呢?我们来看看RocketMQ是怎么处理的。

目前的高性能磁盘,顺序写速度可以达到 600MB/s,超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概 100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差异,好的消息队列系统会比普通的消息队列系统速度快好几个数量级。

RocketMQ 首先将消息数据写入操作系统 Page Cache,然后定时将数据刷入磁盘。下面主要介绍RocketMQ如何接收发送的消息请求并将信息写入Page Cache的,整个过程如下图:

2.1 Broker 接收客户端发送消息的请求并做预处理

  1. SendMessageProcessor.processRequest()方法会自动被调用接收、解析客户端请求为消息实例。该方法执行分为四个过程:解析请求参数、执行发送处理前的Hook、调用保存方法存储消息、执行发送处理后的Hook;

  2. Broker 存储前预处理消息。首先,设置请求处理返回对象标志,代码如下:

    final RemotingCommand response= 				
    RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    response.setOpaque(request.getOpaque());
    

    Netty是异步执行的,也就是说,请求发送到Broker被处理后,返回结果时,在客户端的处理线程已经不再是发送请求的线程了,那么客户端如何确定返回结果对应哪个请求呢?很简单,通过返回标志来判断.其次,做一些列存储前发送请求的数据检查,比如死信消息处理、Broker是否拒绝事务消息处理、消息基本检查等。

  3. 执行 DefaultMessageStore.putMessage() 方法进行消息检查和存储模块检查。在真正保存消息前,会对信息数据做基本检查、对存储服务做可用性检查、对Broker做是否Slave的检查等,总结如下:

    1. 检验存储模块是否已经关闭;
    2. 检验Broker是否是Slave;
    3. 检验存储模块运行标记;
    4. 检验 Topic 长度;
    5. 检验扩展信息的长度;
    6. 检验操作系统 Page Cache 是否繁忙;
  4. 执行 org.apache.rocketmq.store.CommitLog.putMessage()方法,将消息写入 CommitLog。存储信息的核心处理过程如下:

    • 设置信息存储时间为当前时间戳,设置消息完整性校验码CRC;
    • 延迟消息处理。如果发送的消息是延迟消息,这里会单独设置延迟消息的数据字段,比如修改 Topic 为延迟消息特有的 Topic——SCHEDULRE_TOPIC_XXXX,并且备份原来的Topic 和 queueId,以便延迟消息在投递后被消费者消费,延迟消息的处理代码如下:
      final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
      if(tranType == MessageSysFlag.TRANSACTION_NOT_TYPE ||
      tranTypt == MessageSysFlag. TRANSACTION_COMMIT_TYPE )
      //单独处理延迟消息
      if(msg.getDelayTimeLevel() > 0)
          if(msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
          
      
      Topic = ScheduleMessageService.SCHDULE_TOPIC;
      queuId = ScheduleMessageService.delayLevell2QueueId(msg.getDelayTimeLevel());
      
      //备份原 Topic、queueId
      MessageAccessor.putProperty(msg,MessageConst.PROPERTY_REAL_TOPIC,msg.getTopic());
      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msg.getQueueId()));
      msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
      msg.setTopic(Topic);
      msg.setQueueId(queueId);
      
      
    • 获取最后一个CommitLog 文件实例 MappedFile,锁住该 MappedFile。默认为自旋锁,也可以通过 useReentranLockWhenPutMessage 进行配置、修改和使用ReentranLock;
    • 检验最后一个 MappedFile 如果结果为空或已写满,则新建一个 MappedFile返回;
    • 调用 MappedFile.appendMessage(final MessageExtBrokerInner msg,final Append MessageCallback cb),将消息写入 MappedFile;根据消息是单个消息还是批量消息来调用AppendMessageCallback.doAppend() 方法,并将信息写入Page Cache。

2.2 内存映射机制与高效写磁盘

❓在了解了RocketMq存储信息的过程之后,我们来看一下RocketMQ如何保证高效存储呢。

RocketMQ在存储设计中通过内存映射、顺序写文件等方式实现了高吞吐。
下面首先介绍一些RocketMQ的基本数据结构:

org.apache.rocketmq.store.CommitLogRocketMQ 对存储消息的物理文件的抽象实现,也就是物理CommitLog文件的具体实现
org.apache.rocketmq.store.MappedFileCommitLog 文件在内存中的映射文件,映射文件同时具有内存写入速度和与磁盘一样可靠的持久化方式
org.apache.rocketmq.store.MappedFileQueue映射文件队列中有全部的 CommitLog 映射文件,第一个映射文件为最先过期的文件,最后一个文件是最后过期的文件,最新的消息总是写入最后一个映射文件

CommitLog、MappedFileQueue、MappedFile和物理CommitLog文件之间的关系如下图:

每个MappedFileQueue 包含多个MappedFile,就是真实的物理 CommitLog文件。在Java中通过java.nio.MappedByteBuffer 来实现文件的内存映射,即文件读写都是通过 MappedByteBuffer来操作的(其实是Page Cache)。

写入数据的时候先加锁,然后通过Append的方式写入最新 MappedFile。对于读取消息,大部分情况下用户只关心最新数据,而这些数据都在 Page Cache 中,也就是说,读写文件就是在 Page Cache中进行的,其速度几乎等于直接操作内存的速度。

2.3 文件刷盘机制

消息存储完之后,会被操作系统持久化到磁盘,也就是刷盘。RocketMQ支持两种刷盘方式,分别为同步刷盘和异步刷盘。下面我们分别介绍这两种机制的实现方式。

2.3.1 同步刷盘

在Broker存储消息到Page Cache之后,同步将Page Cache刷到磁盘(由GroupCommitService线程服务来负责),再返回客户端。流程图如下:

详细过程如下图:

存储消息线程负责将消息存储到Page Cache或者DM(直接内存)中,存储成功后通过调用handleDiskFlush()方法将同步刷盘请求发送给 GroupCommitService服务,并在该刷盘请求上执行锁等待,代码实现如下:

final  GroupComiitService service = (GroupCommitService)this.FlushCommitLogService;
if(messageExt.isWaitStoreMsgOK()) //客户端可以设置,默认为 True
    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
    service.putRequest(request);//保存同步刷盘请求
    boolean flushOK = request.waitForFlush (//请求同步锁等待
        this.defaultMessageStore.getMessageStoreConfig().getSyncFLushTimeout()); 
    if (!flushOK) 
        log.error (.**); //记录刷盘超吋日志
        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); 
    
 else 
    service.wakeup() ;//异步刷盘,不用同步返回

同步刷盘服务线程会间隔10ms轮询一次,查看是否有同步刷盘请求。如果有则执行服务,刷盘成功之后唤醒等待刷盘请求锁的存储线程消息线程,并告知刷盘操作执行成功。代码如下:

boolean flushOK=false;
for(int i=0;i<2 && !flushOK;++i)
    //判断当前刷盘请求的消息是否已经刷盘
    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >=req.getNextOffset();
    if(!flushOK)
        CommitLog.this.mappedFileQueue.flush(0); //执行刷盘
  
req.wakeupCustomer(flushOK);//唤醒存储消息线程  

2.3.2 异步刷盘

在Broker存储消息到Page Cache之后,立即返回客户端,然后异步刷盘服务(FlushRealTimeService线程)将Page Cache异步刷到磁盘。流程图如下:

💡还有第三个服务——异步转存服务。对应的服务线程是CommitRealTimeService,这部分涉及Broker的读写分离功能,我们在这里不做详细介绍。

在异步转存服务(开启读写分离功能之后)和存储服务把消息写入Page Cache之后,由异步刷盘服务将消息刷入磁盘中,过程如下图:

  1. 第一步获取刷盘参数,关键参数如下:

    flushCommitLogTimed是否定时刷新。默认为False,即实时刷新
    interval设置刷盘间隔。默认500ms
    flushPhysicQueueLeastPages每次刷盘的页数,默认为4页
    flushPhysicQueueThoroughInterval两次刷盘操作的最长间隔时间,默认为10s
  2. 等待刷盘间隔。代码如下:

    if(flushCommitLogTimed)//定时刷盘
    	Thread.sleep(interval);
    else //实时刷盘
        this.waitForRunning(interval);
    
    

    this.waitForRunning() 方法是RocketMQ通过自定义锁实现的线程等待,详细代码如下:

    protected void waitForRunning(long interval)
    	    if(hasNotified.compareAndSet(true,false)) 
    	        this.onWaitEnd();
    	        return ;
    	    
    	    //waitPoint 是基于CountDownLatch 重写的 CountDownLatch2,增加了重置功能
    	    waitPoint.reset();
    	    try
    	         waitPoint.await(interval,TimeUnit.MILLISECONDS);
    	    catch(InterruptedException e)
    	        log.error("Interrupted",e);
    	    finally
    	        hasNotified.set(false);
    	        this.onWaitEnd();
    	    
    	
    

    当数据存储到Page Cache后,通过调用org.apache.rockermq.store.CommitLog.handleDiskFlush()方法唤醒异步刷盘线程。

  3. 执行刷盘。最终的刷盘逻辑在org.apache.rockermq.store.MappedFile.flush()中实现,代码如下:

    if(this.isAbleToFlush(flushLeastPages))  
        if(this.hold()) 
            int value = getReadPosition(); 
            try 
                //读写分离
                if(writeBuffer != null || this.fileChannel.position() != 0)  
                    this.fileChannel.force(false);
                 else  
                    //非读写分离
                    this.mappedByteBuffer.force(); 
                
             catch (Throwable e) 
                log.error("Error occurred when force data to disk.", e); 
            
            
            this.flushedPosition.set(value);
            this.release(); 
         else 
            log.warn("in flush, hold failed, flush offset = " + this.flushedPositicn.get());
            this.flushedPosition.set(getReadPosition());
        
    
    

    上面的代码对数据进行了两次校验。this.isAbleToFlush(flushLeastPages) 方法校验需要刷盘的页码中的数据是否被刷入磁盘,如果被刷入磁盘,则不用再执行刷盘操作;反之,则需要计算是否还有数据需要刷盘。this.hold() 方法的功能是,在映射文件被销毁时尽量不要对在读写的数据造成困扰。所以MappedFile自己实现了引用计数功能,只有存在引用时才会执行刷盘操作。
    在配置了读写分离功能之后,writeBuffer和fileChannel总是不为空。此时要调用 this.fileChannel.force(false) 方法刷盘;而正常刷盘则是调用 this.mappedByteBuffer.force() 方法。

  4. 记录Checkpoint 和耗时日志。这里主要记录最后刷盘成功时间和刷盘耗时超过500ms的情况。

2.3.3 同步刷盘 v.s. 异步刷盘

异步实时刷盘异步定时刷盘同步刷盘
数据一致性
数据可靠性
数据可用性
系统吞吐量

三、总结

堆积能力是消息队列的一个重要考核指标。存储机制是RocketMQ中的核心,也是亮点设计,因为存储机制决定了写入和查询的效率。我们这里详细介绍了RocketMQ的数据存储流程,希望能够让读者有一个深入的了解。

以上是关于RocketMQ消息队列——消息存储详解的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ的死信队列

RocketMQ的死信队列

RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信

RocketMQ消息队列——消息存储详解

RocketMQ消息队列——消息存储详解

RocketMQ 死信队列 | 消费者出现异常如何处理?