Pulsar工作原理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pulsar工作原理相关的知识,希望对你有一定的参考价值。
参考技术A 参考文档:Pulsar:
https://jack-vanlightly.com/blog/2018/10/2/understanding-how-apache-pulsar-works
https://jack-vanlightly.com/blog/2018/10/21/how-to-not-lose-messages-on-an-apache-pulsar-cluster
https://www.splunk.com/en_us/blog/it/effectively-once-semantics-in-apache-pulsar.html
BookKeeper:
https://medium.com/splunk-maas/apache-bookkeeper-insights-part-1-external-consensus-and-dynamic-membership-c259f388da21
https://medium.com/splunk-maas/apache-bookkeeper-internals-part-1-high-level-6dce62269125
https://medium.com/splunk-maas/apache-bookkeeper-internals-part-2-writes-359ffc17c497
https://medium.com/splunk-maas/apache-bookkeeper-internals-part-3-reads-31637b118bf
https://medium.com/splunk-maas/apache-bookkeeper-internals-part-4-back-pressure-7847bd6d1257
https://medium.com/splunk-maas/a-guide-to-the-bookkeeper-replication-protocol-tla-series-part-2-29f3371fe395
中文(翻译的不太行,建议看英文):
https://blog.51cto.com/u_13491808/3124971
https://juejin.cn/post/7034135607869702157
https://juejin.cn/post/6948044887941971998
https://blog.csdn.net/zhaijia03/article/details/112691063
https://xie.infoq.cn/article/5dc2c3a2afa62d1eec3061fec
https://xie.infoq.cn/article/35b764ea84155a0f41542df6f
本文暂不关心Pulsar的消息消费(如批量消费),Topic分区,和消息存储时间(TTL和Retention)等的具体细节,而主要探讨和总结Pulsar的工作原理,部分地方会和Kafka进行对比。
Pulsar的总体架构如下,
总共有3个部分,Broker、BookKeeper、Zookeeper。通常,我们讲Pulsar的话,主要都是指Broker,另外两个模块也是独立的Apache项目。
Broker本身没有存储,因此也不会丢失数据,Broker运行需要的数据都来自BookKeeper和Zookeeper。Broker对client提供服务,处理client的请求。
BookKeeper主要存储消息数据,因而也是需要存储最大的地方,单个BookKeeper也叫Bookie。BookKeeper本身是有WAL的消息存储引擎。
Zookeeper主要存储元数据,包括Broker和BookKeeper的元数据。
Pulsar提供的消息读写模式与Kafka比较类似,都是按Topic来关系消息,都有消息顺序性保证,都有Offset机制,在Pulsar中叫Cursor。
Client发起请求,包括读/写请求,会先发送到Broker,Broker来判定,当前请求消息的Cursor在哪个Bookie上,然后访问对应的Bookie读取/写入消息并返回。这个只是最简单的流程,Pulsar中有Cache机制,实际的流程会比这个更复杂一些。
出于消息可靠性的考虑,Pulsar会将消息存储多份,也就是说,相同的消息会存在多个Bookie上。
关于消息数据的存储,首先要理解下面这张图,
上图中,从上到下,每个层次的名称是:Topic(主题),Ledger(账本),Fragment(片段),Entry(条目)。逐一解释一下,
其中,Ledger、Fragment、Entry是BookKeeper中的概念,Topic是Pulsar中的概念。在Pulsar官方文档中还出现了Managed Ledger概念,没有特别看出和Ledger的差别,应该是和BookKeeper的Ledger等价的Pulsar概念。
在实际的物理结构上,数据存储的分布如下图,
可以看到,一个Topic对应多个Ledger,一个Ledger有1个或多个Fragment,不同的Fragment分布在不同的Bookie上,存储了多份。Ledger、Fragment如何切分,如何分布在Bookie的元数据,统一存储在Zookeeper。
存储结构与Kafka的最大差别是:一个Topic包含多个Ledger,Kafka是1个Ledger;以Fragment为单位进行分布式存储,Kafka是以Ledger为单位分布式存储。可以很明显的感觉到,Pulsar的存储分的很细,而且做到了物理存储结构与逻辑结构相隔离,最终达到,只要扩展Bookie集群就能提升整体可用性和性能。
存储的配置是以Ledger为单位来管理的,最重要的配置有三个,
全体数量,E,表示Ledger可以写入的总体Bookie池的Bookie数量;写入数量,Qw,表示对于每个Entry,Ledger需要写入的份数;响应数量,Qa,表示当写入返回多少个Ack时,返回给客户端,即写入成功。通常情况下,E >= Qw >= Qa。
先来看Qa和Qw,举例,Qa=2,Qw=3。也就是说,对于每个写入的Entry,需要复制3份,也就是存储在3个Bookie上;但只要已经收到成功写入2份的Ack,就表示成功写入,返回给客户端。在这个配置下,如果宕机了1个Bookie,数据是完全可以恢复回来的,但是宕机2个Bookie的话,数据就可能出现丢失。如果想宕机2个Bookie数据仍然可以不丢失,那么至少需要配置Qa=3。
也就是说,Qa是保证数据不丢失的最小数据复制份数,这个取决于应用场景,需要恢复何种宕机程度的数据。这个概念和Kafka的in sync replica很相似。
再来看Qw和E,举例,Qw=3,E=3。这个情况下,对于每个写入的Entry,需要复制到当前Fragment每个Bookie上,如下图,
可以看到,Entry按写入的顺序紧密排列,如果是Qw=3,E=5的情况下,可用Bookie的数量比写入Bookie的数量要多,写入的Entry的排列会出现空洞,如下图,
这样的现象,Pulsar称为Striping。这种情况下,写入的tps会提高,但是读取的性能会下降,最终增大整体的延迟。在这种情况下,BookKeeper的顺序读取被打破,降低整体性能,因此不建议使用。
因此,通常情况下,取E = Qw >= Qa,例如,E=3,Qw=3,Qa=2。
同样不建议取Qa=1。这是一个危险的设置,如果唯一的Bookie宕机,那么就不知道Entry是否已写入。Bookie的恢复会因无法进行而停止。
Brookie也可以配置机架感知(rack-awareness),当配置了机架感知策略时,Broker会尝试选取不同机架的Bookie节点。当然也可以自定义其他选取策略。
Pulsar的Broker不存储数据,因此也不会丢失。Jack Vanlightly的博客原文是这样,
这里并没有无状态的意思,很多中文翻译博客把这里翻译成,Broker是无状态的,甚至把这一句放在非常开头的地方,但其实是不对的。Broker只是不存储有状态的数据而已,本身在内存中是有状态的。Broker和其他Broker并不对等。
每个Topic都归一个Broker所有,所有的读写都需要通过这个Broker进行。写入过程如下图,
可以看到,上图例子中,Qw=3,Broker收到写入请求的时候,先写入Bookie,Bookie完成写入请求后返回Ack,Broker收到Qa个Ack后返回Ack给客户端。如果Bookie返回失败或者无返回,那么Broker会发起创建新Fragment。
读取过程如下图,
因为Topic所有的请求都需要通过所有者Broker,那么,我们可以在这个Broker上引入Cache机制,提升读的QPS。
这样的缓存机制会对不同消费者有比较大的性能差异,如果是追尾消费者(tail reader),即一直追踪Entry最新变化的消费者,当有Entry写入时,会更新Cache,于是直接从缓存中取走Entry;但如果是追赶消费者(catch-up reader),即读取的是老的Entry,例如消费者宕机后重启,中间堆积了一段时间的消息的情况,此时,缓存中没有数据,必须去Bookie上读取,再返回给客户端。由于无法直接从缓存获取Entry,追赶消费者获取消息的性能是要比追尾消费者差很多的。
Broker因为是有状态的,无法做到非常完美的灾备切换,只能在故障后尽快恢复Broker的工作场景。
Broker故障恢复中有一个非常重要的概念,最新确认序号(Last Added Confirmed ,LAC)。这个表示当前Ledger最后commit的序号,也就是收到Qa个Ack的Entry的序号。Pulsar约定,读取数据不可以读取LAC之后的数据,读取LAC之后的数据是没用一致性和正确性保障的,视为脏读。
理解LAC之后,就可以理解Broker故障恢复的栅栏阻挡机制(Fencing)了。步骤如下,
Fencing解决方案解决了脑裂问题,也没有数据丢失。故障恢复,Ledger的状态流程图,
这个方案和 Raft Leader 的故障恢复机制实际上是没有什么差别的,应该是有所借鉴。至于解决了脑裂,这个也不是真正解决,也是由于消息系统的特性导致的直接结果。为什么这么说呢?
Raft 中也有类似的概念,叫 committed index(Pulsar与之对应的是LAC),只有在收到多数节点写入 Entry 返回成功之后,才可以更新 committed index,再更新 Entry 到状态机中,并返回给客户端。对尚未更新 committed index 的 Entry,Raft 也是不可读的。可以发现,Pulsar 的 Fencing 和 Raft 的机制几乎一致,但是 Raft 有脑裂问题。
先回顾一下 Raft 的脑裂问题。当 Raft Leader 节点故障发生时,例如 Raft Leader 网络断开,其他节点已经发现当前 Leader 超时,并发起下一轮选举投票,快速选举出新的 Leader,但是老 Raft Leader 的 Follower 无响应超时时间尚未到达,导致老 Leader 仍然认为自己是真正的 Leader,并响应客户端的请求,因此导致客户端读取到了旧的数据。而与此同时,部分客户端连接到了新 Raft Leader,写入并读取到新的数据,造成不一致,这是 Raft 发生脑裂的原因。Raft 发生脑裂不会持续很长时间,当老 Leader 发现长时间没有收到 Follower 响应而超时(主要取决于超时参数的配置),或者发现有新 Leader 产生时,老 Leader 就会将自己重置为 Follower。
那使用了同样机制的 Pulsar 为什么就没有脑裂问题呢?那是因为,Pulsar 是个消息系统,写入的消息类似 WAL,是不可变的(immutable),追加的。当发生和 Raft 一样的故障的时候,老的 Pulsar Broker 也会读到老的数据,但老的数据仍然合法,因为对同样的 Cursor,在新的 Broker 上也是读到相同的数据,只要读取 Entry 不超过 LAC 就没问题,最多只是无法获取到最新的消息而已,获取的消息并不会错。而 Raft 的存储是偏向于通用存储场景,因此就会有新旧数据版本不一致的问题。
脑裂一般都是指读取数据发生的不一致,如果是写入数据的脑裂,那可能是分布式算法有问题,成熟的算法一般不会有这个问题。
BookKeeper的存储引擎是可插拔的,默认是DbLedgerStorage,整体架构如下,
Bookie写入的流程如下,
Bookie是一个有WAL的消息存储,写入时,会先写入WAL(Journal),再写入Write Cache。Write Cache会定期的将数据排序并写入磁盘的Entry Log文件中。排序过程,将不同Ledger的消息聚合在一起,这样,在读取Ledger的时候,就是完全的磁盘顺序读。如果没有排序聚合的话,就无法获得顺序读的性能。
写入Write Cache的时候,也会把索引信息写入RocksDB,索引信息很简单,就是 (ledgerId, entryId) 到 (entryLogId, 文件偏移量) 的映射。
Bookie可以缓存最近写入的Entry和最近读取的Entry,读取的顺序是: Write Cache -> Read Cache -> Bookie上的Entry。当两个缓存都没有命中的时候,会到RocksDB中查找该Entry所在的文件和偏移量,并读取该Entry,然后缓存再Read Cache中,以期之后可以命中。
BookKeeper可以支持磁盘IO分离,将写入WAL的放在一个高速磁盘上,其他数据放在低速磁盘上。当有写入Entry请求时,只会发生写WAL的磁盘同步IO操作,其他都是写入内存缓存。同时,以异步的方式,将Write Cache中的数据以批量写的方式写入到Entry Log文件和RocksDB中。
Bookie的Journal可以有多个,但和Ledger并不是一一对应的。4.5.0之后的BookKeeper可以配置journalDirectories参数,如,journalDirectories=/tmp/bk-journal1,/tmp/bk-journal2,配置多个目录,由Bookie统一管理。
当Bookie故障的时候,所有在这个Bookie上有Fragment的Ledger都需要复制。恢复过程是重复制Fragment,来确保每个Ledger满足Qw个复制因子。
有两种恢复方法:自动和手动,主要讨论自动方案。自动方案包括内置的故障节点检测机制,手动就需要人为干预。具体的复制过程,两者是一致的。
恢复过程可以通过在Bookie集群上运行AutoRecoveryMain来完成。其中一个自动恢复进程被选举为Auditor,Auditor来检测故障的Bookie,然后,
如果Auditor失败,就再选举一个Auditor。Auditor只是AutoRecoveryMain的一个线程。AutoRecoveryMain也有运行Replication Task Worker的线程,每个Worker监听/underreplicated Znode获取任务。发现任务后,就尝试lock住这个任务,如果lock失败,说明其他Worker已经拿到这个任务,就去寻找下一个任务。
如果获取到了锁,那么需要,
如果Ledger的所有Fragment都已经完全复制,则删除/underreplicated任务;如果仍然存在未完全复制的Fragment,则释放锁,等待其他Worker处理。
如果一个Fragment没有结束Entry id,Worker的复制任务会等待并再次检查。如果还是没有,说明之前的数据副本可能没有完全写入,会发起Fencing任务,然后再继续重复制。
总结部分直接抄了中文翻译,
Jack Vanlightly在博客中表示,Pulsar的两个突出特点是,
后一点应该是独创,前一点应该是借鉴了Raft,之前提到过。
以上是关于Pulsar工作原理的主要内容,如果未能解决你的问题,请参考以下文章