Pulsar 的消息存储机制和 Bookie 的 GC 机制原理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pulsar 的消息存储机制和 Bookie 的 GC 机制原理相关的知识,希望对你有一定的参考价值。
参考技术A [TOC]本文是 Pulsar 技术系列中的一篇,主要简单梳理了 Pulsar 消息存储与 BookKeeper 存储文件的清理机制。其中,BookKeeper 可以理解为一个 NoSQL 的存储系统,默认使用 RocksDB 存储索引数据。
Pulsar 的消息存储在 BookKeeper 中,BookKeeper 是一个胖客户的系统,客户端部分称为 BookKeeper,服务器端集群中的每个存储节点称为 bookie。Pulsar 系统的 broker 作为 BookKeeper 存储系统的客户端,通过 BookKeeper 提供的客户端 SDK 将 Pulsar 的消息存储到 bookies 集群中。
Pulsar 中的每个 topic 的每个分区(非分区 topic,可以按照分区 0 理解,分区 topic 的编号是从 0 开始的),会对应一系列的 ledger,而每个 ledger 只会存储对应分区下的消息。对于每个分区同时只会有一个 ledger 处于 open 即可写状态。
Pulsar 在生产消息,存储消息时,会先找到当前分区使用的 ledger,然后生成当前消息对应的 entry ID,entry ID 在同一个 ledger 内是递增的。非批量生产的情况(producer 端可以配置这个参数,默认是批量的),一个 entry 中包含一条消息。批量方式下,一个 entry 可能包含多条消息。而 bookie 中只会按照 entry 维度进行写入、查找、获取。
因此,每个 Pulsar 下的消息的 msgID 需要有四部分组成(老版本由三部分组成),分别为(ledgerID,entryID,partition-index,batch-index),其中,partition-index 在非分区 topic 的时候为 -1,batch-index 在非批量消息的时候为 -1。
每个 ledger,当存在的时长或保存的 entry 个数超过阈值后会进行切换,同一个 partition 下的,新的消息会存储到下一个 ledger 中。Ledger 只是一个逻辑概念,是数据的一种逻辑组装维度,并没有对应的实体。
BookKeeper 集群中的每个 bookie 节点收到消息后,数据会分三部分进行存储处理,分别为:journal 文件、entryLog 文件、索引文件。
其中 journal 文件,entry 数据是按照 wal 方式写入的到 journal 文件中,每个 journal 文件有大小限制,当超过单个文件大小限制的时候会切换到下一个文件继续写,因为 journal 文件是实时刷盘的,所以为了提高性能,避免相互之间的读写 IO 相互影响,建议存储目录与存储 entrylog 的目录区分开,并且给每个 journal 文件的存储目录单独挂载一块硬盘(建议使用 ssd 硬盘)。journal 文件只会保存保存几个,超过配置个数的文件将会被删除。entry 存储到 journal 文件完全是随机的,先到先写入,journal 文件是为了保证消息不丢失而设计的。
如下图所示,每个 bookie 收到增加 entry 的请求后,会根据 ledger id 映射到存储到那个 journal 目录和 entry log 目录,entry 数据会存储在对应的目录下。目前 bookie 不支持在运行过程中变更存储目录(使用过程中,增加或减少目录会导致部分的数据查找不到)。
如下图所示,bookie 收到 entry 写入请求后,写入 journal 文件的同时,也会保存到 write cache 中,write cache 分为两部分,一部分是正在写入的 write cache, 一部分是正在正在刷盘的部分,两部分交替使用。
write cache 中有索引数据结构,可以通过索引查找到对应的 entry,write cache 中的索引是内存级别的,基于 bookie 自己定义的 ConcurrentLongLongPairHashMap 结构实现。
另外,每个 entorylog 的存储目录,会对应一个 SingleDirectoryDbLedgerStorage 类实例对象,而每个 SingleDirectoryDbLedgerStorage 对象里面会有一个基于 RocksDB 实现的索引结构,通过这个索引可以快速的查到每个 entry 存储在哪个 entrylog 文件中。每个 write cache 在增加 entry 的时候会进行排序处理,在同一个 write cache,同一个 ledger 下的数据是相邻有序的,这样在 write cache 中的数据 flush 到 entrylog 文件时,使得写入到 entrylog 文件中的数据是局部有序的,这样的设计能够极大的提高后续的读取效率。
SingleDirectoryDbLedgerStorage 中的索引数据也会随着 entry 的刷盘而刷盘到索引文件中。在 bookie 宕机重启时,可以通过 journal 文件和 entry log 文件还原数据,保证数据不丢失。
Pulsar consumer 在消费数据的时候,做了多层的缓存加速处理,如下图所示:
获取数据的顺序如下:
上面每一步,如果能获取到数据,都会直接返回,跳过后面的步骤。如果是从磁盘文件中获取的数据,会在返回的时候将数据存储到 read cache 中,另外如果是读取磁盘的操作,会多读取一部分磁盘上的时候,因为存储的时候有局部有序的处理,获取相邻数据的概率非常大,这种处理的话会极大的提高后续获取数据的效率。
我们在使用的过程中,应尽量避免或减少出现消费过老数据即触发读取磁盘文件中的消息的场景,以免对整体系统的性能造成影响。
BookKeeper 中的每个 bookie 都会周期的进行数据清理操作,默认 15 分钟检查处理一次,清理的主要流程如下:
通过上面的流程,我们可以了解 bookie 在清理 entrylog 文件时的大体流程。
需要特别说明的是,ledger 是否是可以删除的,完全是客户端的触发的,在 Pulsar 中是 broker 触发的。
broker 端有周期的处理线程(默认 2 分钟),清理已经消费过的消息所在的 ledger 机制,获取 topic 中包含的 cursor 最后确认的消息,将这个 topic 包含的 ledger 列表中,在这个 id 之前的(注意不包含当前的 ledger id)全部删除(包括 zk 中的元数据,同时通知 bookie 删除对应的 ledger)。
在运用的过程中我们多次遇到了 bookie 磁盘空间不足的场景,bookie 中存储了大量的 entry log 文件。比较典型的原因主要有如下两个。
原因一:
生产消息过于分散,例如,举个极端的场景,1w 个 topic,每个 topic 生产一条,1w 个 topic 顺序生产。这样每个 topic 对应的 ledger 短时间内不会因为时长或者存储大小进行切换,active 状态的 ledger id 分散在大量的 entry log 文件中。这些 entry log 文件是不能删除或者及时压缩的。
如果遇到这种场景,可以通过重启,强制 ledger 进行切换进行处理。当然如果这个时候消费进行没有跟上,消费的 last ack 位置所在的 ledger 也是处于 active 状态的,不能进行删除。
原因二:
GC 时间过程,如果现存的 enrylog 文件比较多,且大量符合 minor 或 major gc 阈值,这样,单次的 minor gc 或者 major gc 时间过长,在这段时间内是不能清理过期的 entry log 文件。
这是由于单次清理流程的顺序执行导致的,只有上次一轮执行完,才会执行下一次。目前,这块也在提优化流程,避免子流程执行实现过长,对整体产生影响。
以上是关于Pulsar 的消息存储机制和 Bookie 的 GC 机制原理的主要内容,如果未能解决你的问题,请参考以下文章