Apache Pulsar 调研

Posted zhisheng_blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Pulsar 调研相关的知识,希望对你有一定的参考价值。

  1. Apache Pulsar

  2. 1. Kafka 概述

    1. 1.1 现存问题

    2. 1.3 优点

    3. 1.4 缺点

  3. 2. Pulsar 架构

    1. 2.4.1 三种写路由策略

    2. 2.4.2 四种读下发策略

    3. 2.4.3 Pull & Push 可选请求模式

    4. 2.4.4 Consume ACK 与 unACK

    5. 2.4.5 Data Retention

    6. 2.3.1 多租户

    7. 2.3.2 Topic 分配

    8. 2.3.3 Topic Lookup

    9. Broker 的 LoadManager 线程

    10. bundle 与 ownership

    11. Topic 分配流程

    12. 设计优点

    13. 2.1.1 数据集合

    14. 2.1.2 存储节点

    15. 2.1.3. 一致性保证

    16. 2.1 Pulsar VS Kafka

    17. 2.2 Pulsar 架构

    18. 2.3 多租户与 Topic Lookup

    19. 2.4 Produce / Consume 策略

  4. 3. Bookkeeper 架构

    1. 3.4.1 读被均摊

    2. 3.4.2 读有预期

    3. 3.4.3 读结果无序

    4. 3.3.1 三种文件

    5. 3.3.2 ADD 操作

    6. 3.3.3 结论

    7. 3.1.1 特性

    8. 3.1.2 Ensemble Size / Ensembles / Write Quorum / ACK Quorum / Segment(Ledger) / Fragment

    9. 3.1.3 结论

    10. 3.1 概念

    11. 3.2 架构

    12. 3.3 写流程

    13. 3.4 读流程

  5. 4. 水平扩容

    1. 4.1 水平扩展 Broker

    2. 4.2 水平扩展 Bookie

  6. 5. Pulsar Consistency

    1. 5.3.1 场景

    2. 5.3.2 流程

    3. 5.3.3 结论:Broker 故障秒级恢复

    4. 5.2.1 场景

    5. 5.2.2 流程

    6. 5.2.3 结论:Bookie 故障秒级恢复

    7. 5.1.1 冗余副本

    8. 5.1.2 一致性机制

    9. 5.1 一致性机制

    10. 5.2 Bookie Auto Recovery:Ensemble Change

    11. 5.3 Broker Recovery:Fencing

  7. 6. Distributed Log 与 Raft

    1. 6.1 概念对比

    2. 6.2 流程对比

    3. 6.3 总结

  8. 7. 总结

    1. 7.1 Pulsar 的优点

    2. 7.2 Pulsar 的缺点

简要总结下对 Pulsar 的调研。

Apache Pulsar

内容:

  • Kafka : 优缺点。

  • Pulsar : 多租户,Topic Lookup,生产消费模式

  • Bookkeeper : 组件概念与读写流程

  • Horizontal Scale : Broker 或 Bookie 的横向扩展

  • Consistency : Broker 或 Bookie crash 后保证日志一致性

  • Distributed Log & Raft 算法

  • 总结

1. Kafka 概述

1.1 现存问题

主要问题:

  • 负载均衡需人工介入:手动按异构配置的 broker 对应生成 assignment 执行计划。

  • 故障恢复不可控:broker 重启后需复制分区新数据并重建索引,其上的读写请求转移到其他 broker,流量激增场景下可能会导致集群雪崩。

其他问题:

  • 跨数据中心备份需维护额外组件:MirrorMaker 官方也承认鸡肋,做跨机房的冗余复制依赖第三方组件如 uber 的 uReplicator

注:已脱敏。

1.3 优点

  • 生态成熟,易与 Flink 等现有组件集成。

  • 可参考资料多,完善的官方文档和书籍。

  • 模型简单易上手:partition 有 replication,以 segment 和 index 方式存储。

1.4 缺点

计算与存储耦合

  • 存储节点有状态:读写只能走 Partition Leader,高负载集群中 Broker 重启容易出现单点故障,甚至雪崩。

  • 手动负载均衡:集群扩容必须手动 Assign Partitions 到新 Broker,才能分散读写的负载。

漫画对比:https://jack-vanlightly.com/sketches/2018/10/2/kafka-vs-pulsar-rebalancing-sketch

2. Pulsar 架构

2.1 Pulsar VS Kafka


PularKafka
数据集合Topic, PartitionTopic, Partition
存储节点及读写组件Bookkeeper BookieBroker

Pulsar BrokerClient SDK
数据存储单元Partition -> Ledgers -> FragmentsPartition -> Segments
数据一致性保证Ensemble Sizemetadata.broker.list

Write Quorum Size(QW)Replication Factor

Ack Quorum Size(QA)request.required.acks

注:(QW+1)/2 <= QA <= QW <= Ensemble Size <= Bookies Count

2.1.1 数据集合

  • Kafka:topic 切分为多个 partitions,各 partition 以目录形式在 leader broker 及其多副本 brokers 上持久化存储。

  • Pulsar:同样有多个 partitions,但一个 partition 只由一个 broker 负责读写(ownership),而一个 partition 又会均匀分散到多台 bookie 节点上持久化存储。

2.1.2 存储节点

  • Kafka:直接持久化到 broker,由 Client SDK 直接读写。

  • Pulsar:分散持久化到 bookie,由 broker 内嵌的 bookkeeper Client 负责读写。

2.1.3. 一致性保证

  • Kafka:通过多 broker 集群,每个 partition 多副本,producer 指定发送确认机制保证。

  • Pulsar:通过多 broker 集群,broker Quorum Write 到 bookie,返回 Quorum ACK 保证。


2.2 Pulsar 架构

2.3 多租户与 Topic Lookup

2.3.1 多租户

  • topic 分三层:persistent://tenant/namespace/topic,对应划分为 department -> app -> topics,以 namespace 为单位进行过期时间设置,ACL 访问鉴权控制。

  • 优点:按租户进行 topic 资源隔离,并混部在同一集群中,提高集群利用率。

2.3.2 Topic 分配

Broker 的 LoadManager 线程
  • Leader:即 Broker Leader,类似 Kafka Controller,汇总所有 Broker 的负载,合理地分配 topic 分区。

  • Wroker:等待分配 bundle 内的所有 topic partition

bundle 与 ownership
  • 以 Namespace 为单位在 ZK 维护 bundle ring(broker 的数量 2~3 倍),topic 分区按 hash(topic_partition)%N 落到 bundle 中。

  • Broker 唯一绑定到 bundle,就对 bundle 内的所有 topic partition 持有 ownership,用于 Broker Recovery 保证高可用。

Topic 分配流程
  • 上报负载:LoadManager Worker 负责向 ZK 汇报负载指标

    zk> get /loadbalance/brokers/localhost:8080
    {
      "pulsarServiceUrl": "pulsar://localhost:6650",
      "cpu": {
        "usage": 23,
        "limit": 50
      },
      "memory": {
        "usage": 1,
        "limit": 10
      },
      "msgThroughputIn": 100,
      "msgThroughputOut": 100
    }
  • bundle 为单位分配:LoadManager Leader 汇总其他 Brokers 的负载,根据负载分配 bundle

    zk> get /loadbalance/leader
    {"serviceUrl":"http://localhost:8080","leaderReady":false}
  • 分配结果:

    zk> ls /namespace/public/default
    [0x00000000_0x40000000, 0x40000000_0x80000000, 0x80000000_0xc0000000, 0xc0000000_0xffffffff]
    
    zk> get /namespace/public/default/0x80000000_0xc0000000
    {"nativeUrl":"pulsar://localhost:6650","httpUrl":"http://localhost:8080","disabled":false}
设计优点

不同于 kafka 将所有 topic ISR 等元数据记录到 zk,pulsar 只记录 topic 的分区数,不记录 topic 到 broker 的映射关系,zk 元数据数量极少,所以支持百万量级 topic

zk> get /admin/partitioned-topics/public/default/persistent/partitioned-topic-1
{"partitions":2}

2.3.3 Topic Lookup

  • Client 向任一 BrokerA 发起 Lookup 请求,如 persistent://public/default/test-topic-1

  • BrokerA 计算 default namespace 下 hash(topic_partition)%N 的值,得到该 topic partition 对应的 bundle,从而查出 ownership BrokerX

  • BrokerA 返回 owner BrokerX 地址。


2.4 Produce / Consume 策略

2.4.1 三种写路由策略

  • RoundRobinPartition(默认):以 batching 为单位,通过轮询将消息均匀发给 brokers,以获得最大吞吐。

  • SinglePartition

    • 有 KEY 则写固定分区,类似 hash(key) mod len(partitions) 写到指定分区。

    • 无 KEY 则随机选一个分区,写入该 producer 的所有消息。

  • CustomPartition:用户可自定义针对具体到消息的分区策略,如 Java 实现 MessageRouter 接口。


2.4.2 四种读下发策略

  • Exclusive (默认):独占消费,一对一,保证有序消费,能批量 ACK,是 Failover 特例,不保证高可用。

  • Failover:故障转移消费,一对一,备选多,保证有序消费,消费者高可用,能批量 ACK,保证高可用。

  • Shared:共享消费,多对多

    • Round Robin 分发消息,类似 Consumer Group 但不保证有序消费。

    • 只能逐条 ACK:Consumer crash 时才能精确控制消息的重发。

    • 水平扩展 Consumer 直接提读吞吐。不像 kafka 必须先扩 Partition 才能扩 Consumer

  • Key_Shared:按 KEY 共享消费,多对多,Exclusive 和 Shared 的折中模式。

    • KEY hash 相同的消息会被相同 consumer 消费,保证有序消费。

    • 只能逐条 ACK

    • 水平扩展 Consumer 提高读吞吐。

2.4.3 Pull & Push 可选请求模式

  • Consumer 可以同步或异步 p Receive 消息。

  • Consumer 可以本地注册 MessageListener 接口来等待 Broker Push 消息。

2.4.4 Consume ACK 与 unACK

  • 逐条 ACK、批量 ACK

  • 取消 ACK:consumer 消费出错可请求重新消费,发送取消 ACK 后 broker 会重发消息。

    • exclusive, failover:只能取消上一次提交的 ACK,单个 consumer 可控回滚。

    • shared, key_shared:类比 ACK,consumers 只能取消上一条发出的 ACK

与 __consumer_offsets 机制类似 ,Broker 收到各消费者的 ACK 后,会更新 Consumer 的消费进度 cursor,并持久化到特定的 ledger 中。

2.4.5 Data Retention

  • 默认积极保留:最慢的 subscription 堆积的消息都不能被删除,最坏的情况是某个 subscription 下线后,cursor 依旧会保留在 message streaming 中,会导致消息过期机制失效。

  • 消息过期:时间或大小两个维度设置限制,但只对积极保留之前的消息生效

  • TTL:强制移动旧慢 cursor 到 TTL 时间点,若 TTL == Retention,则与 kafka 一样强制过期

两个指标

  • Topic Backlog:最慢的 subscription 的 cursor 到最新一条消息之间的消息数量。

  • Storage Size:topic 总空间。

    • 按 segment 粒度删除,以 Last Motify Time 是否早于 Retention 为标准过期,与 kafka 一致

    • 注:bookie 并非同步过期,空间释放是后台进程定期清理


3. Bookkeeper 架构

append-only 的分布式 KV 日志系统,K 是 (Ledger_id, Entry_id) 二元组,V 是 (MetaData, RawData) 二进制数据。

3.1 概念

3.1.1 特性

  • 高效写:append-only 磁盘顺序写。

  • 高容错:通过 bookie ensemble 对日志进行冗余复制。

  • 高吞吐:直接水平扩展 bookie 提高读写吞吐。

3.1.2 Ensemble Size / Ensembles / Write Quorum / ACK Quorum / Segment(Ledger) / Fragment

  • Ensemble Size:指定一段日志要写的 bookies 数量。

  • Ensembles:指定写一段日志的目标 bookies 集合。

  • Write Quorum:指定一条日志要写的 bookie 数量。

  • ACK Quorum:指定一条日志要确认已写入的 bookie 数量。

  • Segment / Ledger:要写入的一段日志。

  • Fragment:写入的一条日志。

3.1.3 结论

  • Client 会以 Round Robin 的策略挑选出 bookie,依次顺延写 entry

  • Client 只等待 ACK Quorum 个 broker 返回 Append ACK 就认为写成功。

  • 一个 Segment / Ledger 包含多个 Fragment

  • Fragment 内的 entry 呈带状连续分布在 Ensembles Bookies 上。

  • 一个周期内,一台 Bookie 会存储不连续的 (EnsembleSize - WriteQuorum) 条 Entry


3.2 架构

三个组件

  • zk / etcd:强一致性元数据存储

    • 元数据存储:ledger 元数据。

    • 服务发现:bookie 的注册中心,bookie 互相发现,client 读取集群全部 bookie 地址。

  • Bookie:存储节点,只允许 ADD / READ 两个操作,不保证一致性,不保证可用性,功能简单。

  • Client:实现冗余复制的逻辑,保证数据的一致性,实现复杂且最重要。


3.3 写流程

3.3.1 三种文件

  • Journal WAL

    • 概念:用于持久化存储 bookie 操作 ledger 的事务日志,接收来自多个 Ledger Client 写入的不同 ledger entries,直接 高效地 append 到内存,随后 fsync 顺序写磁盘,延迟低。

    • 清理:当 Write Cache 完成 Flush 落盘后自动删除。

  • Entry Logs

    • 概念:真正落盘的日志文件,有序保存不同 ledger 的 entries,并维护 Write Cache 加速热日志的查找。

    • 清理:bookie 后台 GC 线程定期检查其关联的 ledgers 是否在 zk 上已删除,若已删除则自动清理。

  • Index Files

    • 概念:高效顺序写的副作用是,必须在外围维护 (ledger_id, entry_id) 到 Entry_Log 的映射索引,才能实现高效读,故 Flush Cache 时会分离出索引文件。

    • 实现:可选 RocksDB 和文件存储索引。

3.3.2 ADD 操作

  • Clients 混乱地给 Bookie 发来不同 ledger 的日志。

  • Bookie 往追加写 Journal,同时向 Write Cache 有序写(Write Cache 内部使用 SkipList 实现动态有序,同时保证读写都高效)

  • WriteCache 写满后 Flush 分离出 index 文件和落盘的日志文件。

  • 删除旧 Journal,创建新 Journal 继续追加写,如此循环。

3.3.3 结论

broker 内部为每个 ledger 持久化了其存储的 entry logs,并建立索引提高读效率。


3.4 读流程

Client 发来 (ledger_id, entry_id) 的 KEY

  • 热 KEY:在 Write Cache 中则直接返回。

  • 冷 KEY:读取 ledger_id 对应的 index 文件,根据 index 找出 entry_id 对应的 entry log 再返回。

3.4.1 读被均摊

如同轮询写,Cleint 也会轮询 Ensembles 均摊读取,同样不存在 leader 读瓶颈。

3.4.2 读有预期

若某个 Bookie 读响应确实很慢,Client 会向其他副本 Bookie 发起读请求,同时等待,从而保证读延时低。

3.4.3 读结果无序

Client 往 bookie 写是轮询无序地写,故从 Ensembles 中读到是消息是无序的,需在 Client 端自行按 entry_id 重新排序,以保证有序响应。


4. 水平扩容

4.1 水平扩展 Broker

新 Broker 加入集群后,Broker Leader 会将高负载 Broker 的部分 topic ownership 转移给新 Broker,从而分摊读写压力。


4.2 水平扩展 Bookie

新 Bookie 加入集群后,Broker 通过 ZK 感知到,并将 ledger 的新 entry log 写到新 Bookie,提高存储层的读写吞吐、存储容量。

5. Pulsar Consistency


5.1 一致性机制

日志的冗余复制、一致性保证均由 Bookkeeper Client 实现。


5.1.1 冗余副本

由如上的 Eensembles 的 QW 和 QA 的多副本写,保证每条日志确实持久化到了 bookie 中。

5.1.2 一致性机制

滑动窗口:[0, ..., READABLE ... LAC], [LAC+1, ... WAIT_QUOROM ..., LAP]

  • LAP(Last Add Pushed):Client 发出的最后一条 entry_id(从 0 自增的正整数)

  • LAC(Last Add Confirmed):Client 收到的最后一条 ACK 的 entry_id,是一致性的边界。

实现一致性的三个前置条件:

  • 写 ledger 只能以 Append-Only 方式追加写,写满后变为 Read-Only

  • 一个 Ledger 同一时间只会有一个 Client 在写。

  • LAC 必须按照 LAP 的顺序,依次进行 ACK 确认:保证 LAC 作为一致性边界,前边的日志可读,后边的日志等待多副本复制。

5.2 Bookie Auto Recovery:Ensemble Change

5.2.1 场景

bookie crash 下线后,需恢复副本数量。


5.2.2 流程

  • 存在 Leader Bookie 5 作为 Daemon Auditor,不断向其他 Bookies 发送心跳保活。

  • Auditor 发现 Bookie 4 超时,读取 zk 发现 ledger x 的 [0, 7) entry_id 区间需要从 4 转移到新 Bookie

  • 找出负载较小的 Bookie 6,并根据 Ensembles 发现冗余数据分布在 {B1, B2, B3, B5}

  • 按轮询均摊复制读压力的方式,将 entry log 逐一复制到 Bookie 6

  • 复制完毕后修改 ZK 元数据,将 LAC0 的副本 4 替换为 6

5.2.3 结论:Bookie 故障秒级恢复

  • 写请求快速转移:

    Bookie 6 加入 Ensembles 后,直接代替 Bookie 4 继续 Append 日志。因为副本数恢复是各个 Ensembles 内部各节点的 Auditor 线程后台异步复制,不会导致 Client 的写中断,整个 Recovery 过程对 Client 几乎透明。

  • LAC 分界线记录 Ensemble Change 历史:

    在 ZK 的 ledger metadata 中,会记录每次 Recovery 导致的 ensembles 更新,即记录了 ledger 各 entry log 区间的分布情况。
    如下元数据记录了 ledger16 在 LAC46 处,Bookie 3183 下线,随后 Bookie 3182 上线从 LAC47 处继续处理请求:

    > get /ledgers/00/0000/L0016
    ensembleSize: 3
    quorumSize: 2
    ackQuorumSize: 2
    lastEntryId: -1
    state: OPEN
    segment {
      ensembleMember: "10.13.48.57:3185"
      ensembleMember: "10.13.48.57:3184"
      ensembleMember: "10.13.48.57:3183"
      firstEntryId: 0
    }
    segment {
      ensembleMember: "10.13.48.57:3185"
      ensembleMember: "10.13.48.57:3184"
      ensembleMember: "10.13.48.57:3182"
      firstEntryId: 47
    }

注意:右上可看出 ZK 中各 ledger 的元数据硬编码了 Bookie 的 IP,容器部署时若 Bookie 重启后 IP 变化,会导致旧 Ledger 的该副本作废,故在 k8s 上部署时应选择 DaemonSet 或 StatefulSet

5.3 Broker Recovery:Fencing

5.3.1 场景

Broker crash,或 Broker 与 ZK 出现网络分区导致脑裂,需进行 partition ownership 转移。

5.3.2 流程

  • Broker1 心跳超时后,ZK 将 topic partition 的 ownership 转移到 Broker2

  • Broker2 向 Ensemble 发起 Fencing ledger_X 请求,Bookies 纷纷将 ledger_X 置为 Fencing 不可写状态。

  • Broker1 写数据失败收到 FenceException,说明该 partition 已被 Broker 接管,主动放弃 ownership

  • Client 收到异常后与 Broker1 断开连接,进行 Topic Lookup 与 Broker2 建立长连接。

  • 同时,Broker2 对 ledger_X LAC1 之后的 entry log 依次逐一进行 Forwarding Recovery(若 unknow 状态的 entry 副本数实际上已达到 WQ,则认为该 entry 写成功,LAC1 自增为 LAC2)

  • Broker2 更新 ledger_X 的 metadata,将其置为 CLOSE 状态,再创建新 ledger,继续处理 Client 的写请求。

5.3.3 结论:Broker 故障秒级恢复

  • 不复用旧 ledger,降低复杂度
    若复用旧 ledger_X,必须保证所有 ensemble 的 LAC 一致,同时涉及尾部 entry 的强一致复制,逻辑复杂。直接 CLOSE 能保证旧 ledger 不会再被写入。

  • Recovery 逻辑简单,耗时短
    在 Client 的视角,只需等待两个过程:

    等待结束后,直接往新 Broker 的新 ledger 上追加写数据,Broker 不参与任何数据冗余复制的流程,所以是无状态的,可以直接水平扩展提升以提升吞吐。

    • ZK 进行 partition ownership 的转移。

    • 新 Broker 对 UNKNOWN 状态的尾部 entry 进行 Forwarding Recovery

6. Distributed Log 与 Raft

6.1 概念对比

概念RaftDL
roleLeader 与 FollowersWriter (broker) 与 Bookies
failovertermledger_id
replicationMajority AppendEntries RPCQuorum Write
consistencyLast Committed IndexLast Add Confirmed(LAC)
brain splitMajority VoteBroker Fencing

6.2 流程对比

6.3 总结

  • LAC 与 LAP 的存在,使 entry 能以内嵌顺序元数据的方式,均匀分散存储到各台 bookie 中。

  • DL 与 Raft 不同之处在于:

    各 bookie 节点的数据不是从单个节点异步复制而来,而是由 Client 直接轮询分发。

    • 为保证 bookie 能快速 append 日志,bookkeeper 设计了 Journal Append-only 顺序写日志机制。

    • 为保证 bookie 能快速根据 (lid, eid) 读取消息(entry),bookkeeper 设计了 Ledger Store

因此,各 bookie 存储节点的身份是平等的,没有传统一致性算法的 Leader 和 Follower 的概念,完美避开了读写只能走 Leader 导致 Leader 容易成为单点瓶颈的问题。
同时,能直接添加新 Bookie 提升读写吞吐,并降低其他旧 Bookie 的负载。


7. 总结

7.1 Pulsar 的优点

直接解决 Kafka 容器平台现有的手工扩容、故障恢复慢的问题。

  • 稳定性可用性高:秒级 Broker / Bookie 的快速故障恢复。

  • 水平线性扩容:存储与计算分离,可对 Broker 扩容提升读写吞吐,可对 Bookie 扩容降低集群负载并提升存储容量。

  • 扩容负载均衡:Bookie 扩容后新的 ledger 会在新 Bookie 上创建,自动均摊负载。

7.2 Pulsar 的缺点

  • 概念多,系统复杂,隐藏 bug 修复门槛高。

  • 背书少,国内仅腾讯金融和智联招聘在使用。

本文地址:https://yinzige.com/2020/04/24/pulsar-survey/


 

end

Flink 从入门到精通 系列文章

基于 Apache Flink 的实时监控告警系统
关于数据中台的深度思考与总结(干干货)
日志收集Agent,阴暗潮湿的地底世界

公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug 👇

以上是关于Apache Pulsar 调研的主要内容,如果未能解决你的问题,请参考以下文章

04_Apache Pulsar的可视化监控管理Apache Pulsar的可视化监控部署

Apache Pulsar 连接被拒绝

跨城实践中,腾讯如何应用 Apache Pulsar

云原生时代顶流消息中间件Apache Pulsar部署实操之Pulsar IO与Pulsar SQL

新一代消息队列 Pulsar

新一代消息队列 Pulsar