新一代消息队列 Pulsar

Posted 腾讯技术工程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了新一代消息队列 Pulsar相关的知识,希望对你有一定的参考价值。


作者:joylei,腾讯 PCG 后台开发工程师

在信息流场景,内容的请求处理、原子模块调度、结果的分发等至关重要,直接影响到内容的外显、推荐、排序等。基于消息 100% 成功的要求,我们团队对 Pulsar 进行了调研,并采用腾讯云的 TDMQ(Pulsar 版)实现消息的可靠处理。本文主要参考 Pulsar 的官方文档和技术文章,对 Pulsar 的特性、机制、原理等进行整理总结。后续我们团队计划产出多篇文章,重点聚焦分析 Pulsar 与其他消息队列(Kafka、RocketMQ 等) 的调度和写盘等,以及 Pulsar 在信息流内容链路场景的使用实践。


1. Pulsar 概述

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。

Pulsar 是一个 pub-sub (发布-订阅)模型的消息队列系统。

1.1. Pulsar 架构

Pulsar 由 Producer、Consumer、多个 Broker 、一个 BookKeeper 集群、一个 Zookeeper 集群构成,具体如下图所示。

  • Producer:数据生成者,即发送消息的一方。生产者负责创建消息,将其投递到 Pulsar 中。
  • Consumer:数据消费者,即接收消息的一方。消费者连接到 Pulsar 并接收消息,进行相应的业务处理。
  • Broker:无状态的服务层,负责接收消息、传递消息、集群负载均衡等操作,Broker 不会持久化保存元数据。
  • BookKeeper:有状态的持久层,负责持久化地存储消息。
  • ZooKeeper:存储 Pulsar 、 BookKeeper 的元数据,集群配置等信息,负责集群间的协调(例如:Topic 与 Broker 的关系)、服务发现等。
  • 从 Pulsar 的架构图上可以看出, Pulsar 在架构设计上采用了计算与存储分离的模式,发布/订阅相关的计算逻辑在 Broker 上完成,而数据的持久化存储交由 BookKeeper 去实现。

    1.1.1. Broker 扩展

    在 Pulsar 中 Broker 是无状态的,当需要支持更多的消费者或生产者时,可以简单地添加更多的 Broker 节点来满足业务需求。Pulsar 支持自动的分区负载均衡,在 Broker 节点的资源使用率达到阈值时,会将负载迁移到负载较低的 Broker 节点,这个过程中分区也将在多个 Broker 节点中做平衡迁移,一些分区的所有权会转移到新的 Broker 节点。在后面 Bundle 小节会具体介绍这部分的实现。

    1.1.2. Bookie 扩展

    存储层的扩容,通过增加 Bookie 节点来实现。在 BooKie 扩容的阶段,由于分片机制,整个过程不会涉及到不必要的数据搬迁,即不需要将旧数据从现有存储节点重新复制到新存储节点。在后续的 Bookkeeper 小节中会具体介绍。

    1.2. Topic

    和其他消息队列类似,Pulsar 中也有 Topic。Topic 即在生产者与消费者中传输消息的通道。消息可以以 Topic 为单位进行归类,生产者负责将消息发送到特定的 Topic,而消费者指定特定的 Topic 进行消费。

    1.2.1. 分区 Topic(Topic-Partition)

    Pulsar 的 Topic 可以分为非分区 Topic 和分区 Topic 。

    普通的 Topic 仅仅被保存在单个 Broker 中,这限制了 Topic 的最大吞吐量。分区 Topic 是一种特殊类型的主题,支持被多个 Broker 处理,从而实现更高的吞吐量。

    针对一个 Topic ,可以设置多个 Topic 分区来提高 Topic 的吞吐量。每个 Topic Partition 由 Pulsar 分配给某个 Broker ,该 Broker 称为该 Topic Partition 的所有者。生成者和消费者会与每个 Topic 分区的 Broker 创建链接,发送消息并消费消息。

    如下图所示, Topic1 有 Partition1、 Partition2、 Partition3、 Partition4 、 Partition5 五个分区, Partition1 和 Partition4 由 Broker1 处理, Partition2 和 Partition5 由 Broker2 处理, Partition3 由 Broker3 处理。

    从 Pulsar 社区版的 golang-sdk 可以看出,客户端的 Producer 和 Consumer 在初始化的时候,都会与每一个 Topic-Partition 创建链接,并且会监听是否有新的 Partition,以创建新的连接。

    1.2.2. 非持久 topic

    默认情况下, Pulsar 会保存所有没确认的消息到 BookKeeper 中。持久 Topic 的消息在 Broker 重启或者 Consumer 出现问题时保存下来。

    除了持久 Topic , Pulsar 也支持非持久 Topic 。这些 Topic 的消息只存在于内存中,不会存储到磁盘。

    因为 Broker 不会对消息进行持久化存储,当 Producer 将消息发送到 Broker 时, Broker 可以立即将 ack 返回给 Producer ,所以非持久 Topic 的消息传递会比持久 Topic 的消息传递更快一些。相对的,当 Broker 因为一些原因宕机、重启后,非持久 Topic 的消息都会消失,订阅者将无法收到这些消息。

    1.2.3. 重试 topic

    由于业务逻辑处理出现异常,消息一般需要被重新消费。Pulsar 支持生产者同时将消息发送到普通的 Topic 和重试 Topic ,并指定允许延时和最大重试次数。当配置了允许消费者自动重试时,如果消息没有被消费成功,会被保存到重试 Topic 中,并在指定延时时间后,重新被消费。

    1.2.4. 死信 topic

    当 Consumer 消费消息出错时,可以通过配置重试 Topic 对消息进行重试,但是,如果当消息超过了最大的重试次数仍处理失败时,该怎么办呢?Pulsar 提供了死信 Topic ,通过配置 deadLetterTopic,当消息达到最大重试次数的时候, Pulsar 会将消息推送到死信 Topic 中进行保存。

    1.3. 订阅(subscription)

    通过订阅的方式,我们可以指定消息如何投递给消费者。

    1.3.1. 订阅类型(Subscription type)

    Pulsar 支持独占(Exclusive)、灾备(Failover)、共享(Shared)、Key_Shared 这四种订阅类型。

  • 独占(Exclusive)SinglePartition

    Exclusive 下,只允许 Subscription 存在一个消费者,如果多个消费者使用同一个订阅名称去订阅同一个 Topic ,则会报错。

    如下图,只有 Consumer A-0 可以消费数据。

  • 灾备(Failover)

    Failover 下,一个 Subscription 中可以有多个消费者,但只有 Master Consumer 可以消费数据。当 Master Consumer 断开连接时,消息会由下一个被选中的 Consumer 进行消费。

    如下图,Consumer-B-0 是 Master Consumer 。当 Consumer -B-0 发生问题与 Broker 断开连接时, Consumer-B-1 将成为下一个 Master Consumer 来消费数据。

  • 分区 Topic:Broker 会按照消费者的优先级和消费名的顺序对消费者进行排序,将 Topic 均匀地分配给优先级最高的消费者。
  • 非分区 Topic:Broker 会根据消费者订阅的非分区 Topic 的时间顺序选择消费者。
  • 共享(Shared)

    Shared 中,多个消费者可以绑定到同一个 Subscription 上。消息通过 round robin 即轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给消费者但没有被确认的消息将被重新处理,分发给其它存活的消费者。

    如下图, Consumer-C-1 、 Consumer-C-2 、 Consumer-C-3 都可以订阅 Topic 消费数据。

  • Key_Shared

    Key_Shared 中,多个 Consumer 可以绑定到同一个 Subscription 。消息在传递给 Consumer 时,具有相同键的消息只会传递给同一个 Consumer 。

  • 1.3.2. 订阅模式(Subscription modes)

    订阅模式有持久化和非持久化两种。订阅模式取决于游标(cursor)的类型。

    创建订阅时,将创建一个相关的游标来记录最后使用的位置。当订阅的 consumer 重新启动时,它可以从它所消费的最后一条消息继续消费。

  • Durable (持久订阅):游标是持久性的,会保留消息并保持游标记录的位置。当 Broker 重新启动时,可以从 BookKeeper 中恢复游标,消息可以从游标上次记录的位置继续消费。默认情况下,都是持久化订阅。
  • NonDurable(非持久订阅):游标不是持久性的,当 Broker 宕机时,游标会丢失并无法恢复,所以消息无法继续从上次消费的位置开始继续消费。
  • 一个订阅可以有一个或多个消费者。当使用者订阅主题时,它必须指定订阅名称。持久订阅和非持久订阅可以具有相同的名称,它们彼此独立。如果使用者指定了以前不存在的订阅,则会自动创建订阅。

    默认情况下,没有任何持久订阅的 Topic 的消息将被标记为已删除。如果要防止消息被标记为已删除,可以为此 Topic 创建持久订阅。在这种情况下,只有被确认的消息才会被标记为已删除。

    1.3.3. 多主题订阅

    当 Consumer 订阅 Topic 时,默认指定订阅一个主题。从 Pulsar 的 1.23.0-incubating 的版本开始, Pulsar 消费者可以同时订阅多个 Topic 。可以通过两种方式进行订阅:

  • 正则表达式,例如:persistent://public/default/finance-.*
  • 明确指定 Topic 列表
  • 2. Pulsar 生产者 (Producer)

    Producer 是连接 topic 的程序,它将消息发布到一个 Pulsar broker 上。

    2.1. 访问模式

    消息生成者有多种模式访问 Topic ,可以使用以下几种方式将消息发送到 Topic 。

  • Shared:默认情况下,多个生成者可以将消息发送到同一个 Topic。
  • Exclusive:在这种模式下,只有一个生产者可以将消息发送到 Topic ,当其他生产者尝试发送消息到这个 Topic 时,会发生错误。只有独占 Topic 的生产者发生宕机时(Network Partition )该生产者会被驱逐,新的生产者才能产生并向 Topic 发送消息。
  • WaitForExclusive:在这种模式下,只有一个生产者可以将消息发送到 Topic 。当已有生成者和 Topic 建立连接时,其他生产者的创建会被挂起而不会产生错误。如果想要采用领导者选举机制来选择消费者的话,可以采用这种模式。
  • 2.2. 路由模式

    当将消息发送到分区 Topic 时,需要指定消息的路由模式,这决定了消息将会被发送到哪个分区 Topic 。Pulsar 有以下三种消息路由模式,RoundRobinPartition 为默认路由模式。

  • RoundRobinPartition:如果消息没有指定 key,为了达到最大吞吐量,生产者会以 round-robin (轮询)方式将消息发布到所有分区。请注意 round-robin 并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。如果消息指定了 key,分区生产者会根据 key 的 hash 值将该消息分配到对应的分区。这是默认的模式。
  • SinglePartition:如果消息没有指定 key,生产者将会随机选择一个分区,并发布所有消息到这个分区。如果消息指定了 key,分区生产者会根据 key 的 hash 值将该消息分配到对应的分区。
  • CustomPartition:自定义模式,用户可以创建自定义路由模式,通过实现 MessageRouter 接口实现自定义路由规则。
  • 2.3. 批量处理

    Pulsar 支持对消息进行批量处理。批量处理启用后, Producer 会在一次请求中累积并发送一批消息。批量处理时的消息数量取决于最大消息数(单次批量处理请求可以发送的最大消息数)和最大发布延迟(单个请求的最大发布延迟时间)决定。开启批量处理后,积压的数量是批量处理的请求总数,而不是消息总数。

    2.3.1. 索引确认机制

    通常情况下,只有 Consumer 确认了批量请求中的所有消息,这个批量请求才会被认定为已处理。当这批消息没有全部被确认的情况下,发生故障时,会导致一些已确认的消息被重复确认。

    为了避免 Consumer 重复消费已确认的消息, Pulsar 从 Pulsar 2.6.0 开始采用批量索引确认机制。如果启用批量索引确认机制, Consumer 将筛选出已被确认的批量索引,并将批量索引确认请求发送给 Broker 。Broker 维护批量索引的确认状态并跟踪每批索引的确认状态,以避免向 Consumer 发送已确认的消息。当该批信息的所有索引都被确认后,该批信息将被删除。

    默认情况下,索引确认机制处于关闭状态。开启索引确认机制将产生导致更多内存开销。

    2.3.2. key-based batching

    key_shared 模式下,Broker 会根据消息的 key 来分发消息,但默认的批量处理模式,无法保证将所有的相同的 key 都打包到同一批中,而且 Consumer 在接收到批数据时,会默认把第一个消息的 key 当作这批消息的 key ,这会导致消息的错乱。因此 key_shared 模式下,不支持默认的批量处理。

    key-based batching 能够确保 Producer 在打包消息时,将相同 key 的消息打包到同一批中,从而 consumer 在消费的时候,也能够消费到指定 key 的批数据。

    没有指定 key 的消息在打包成批后,这一批数据也是没有 key 的, Broker 在分发这批消息时,会使用 NON_KEY 作为这批消息的 key 。

    2.4. 消息分块

    启用分块后,如果消息大小超过允许发送的最大消息大小时, Producer 会将原始消息分割成多个分块消息,并将分块消息与消息的元数据按顺序发送到 Broker。

    在 Broker 中,分块消息会和普通消息以相同的方式存储在 Ledger 中。唯一的区别是, Consumer 需要缓存分块消息,并在接收到所有的分块消息后将其合并成真正的消息。如果 Producer 不能及时发布消息的所有分块, Consumer 不能在消息的过期时间内接收到所有的分块,那么 Consumer 已接收到的分块消息就会过期。

    Consumer 会将分块的消息拼接在一起,并将它们放入接收器队列中。客户端从接收器队列中消费消息。当 Consumer 消费到原始的大消息并确认后, Consumer 就会发送与该大消息关联的所有分块消息的确认。

    2.4.1. 处理一个 producer 和一个订阅 consumer 的分块消息

    如下图所示,当生产者向主题发送一批大的分块消息和普通的非分块消息时。假设生产者发送的消息为 M1,M1 有三个分块 M1-C1,M1-C2 和 M1-C3。这个 Broker 在其管理的 Ledger 里面保存所有的三个块消息,然后以相同的顺序分发给消费者(独占/灾备模式)。消费者将在内存缓存所有的块消息,直到收到所有的消息块。将这些消息合并成为原始的消息 M1,发送给处理进程。

    2.4.2. 多个生产者和一个生产者处理块消息

    当多个生产者发布块消息到单个主题,这个 Broker 在同一个 Ledger 里面保存来自不同生产者的所有块消息。如下所示,生产者 1 发布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三个块组成。生产者 2 发布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三个块组成。这些特定消息的所有分块是顺序排列的,但是其在 Ledger 里面可能不是连续的。这种方式会给消费者带来一定的内存负担。因为消费者会为每个大消息在内存开辟一块缓冲区,以便将所有的块消息合并为原始的大消息。

    3. Pulsar 消费者 (Consumer)

    Consumer 是通过订阅关系连接 Topic ,接收消息的程序。

    Consumer 向 Broker 发送 flow permit request 以获取消息。在 Consumer 端有一个队列,用于接收从 Broker 推送来的消息。

    3.1. 消息确认

    Pulsar 提供两种确认模式:

  • 累积确认:消费者只需要确认最后一条收到的消息,在此之前的消息,都不会被再次发送给消费者。
  • 单条确认:消费者需要确认每条消息并发送 ack 给 Broker 。
  • 如图,上方为累积确认模式,当消费者发送 M12 的确认消息给 Broker 后, Broker 会把 M12 之前的消息和 M12 一样都标记为已确认。下方为单条确认模式,当消费者发送 M7 的确认消息给 Broker 后, Broker 会把 M7 这条消息标记为已确认。当消费者发送 M12 的确认消息给 Broker 后, Broker 会把 M12 这条消息标记为已确认。

    需要注意的是,订阅模式中的 shared 模式是不支持累积确认的。因为该订阅模式下的每个消费者都能消费数据,无法保证单个消费者的消费消息的时序和顺序。

    3.1.1. AcknowledgmentsGroupingTracker

    消息的单条确认和累积确认并不是直接发送确认请求给 Broker,而是把请求转交给 AcknowledgmentsGroupingTracker 处理。

    为了保证消息确认的性能,并避免 Broker 接收到非常高并发的 ack 请求,Tracker 默认支持批量确认,即使是单条消息的确认,也会先进入队列,然后再一批发往 Broker。在创建 consumer 的时候,可以设置 acknowledgementGroupTimeMicros,默认情况下,每 100ms 或者堆积超过 1000 时,AcknowledgmentsGroupingTracker 会发送一批确认请求。如果设置为 0,则每次确认消息后,Consumer 都会立即发送确认请求。

    3.2. 取消确认

    当 Consumer 无法处理一条消息并想重新消费时, Consumer 可以发送一个取消确认的消息给 Broker , Broker 会重新将这条消息发送给 Consumer 。如果启用了批量处理,那这一批中的所有消息都会重新发送给消费者。

    消息取消确认也有单条取消模式和累积取消模式 ,取决于消费者使用的订阅模式。

    在 Exclusive 模式和 Failover 订阅模式中,消费者仅仅只能对收到的最后一条消息进行取消确认。

    在 Shared 和 Key_Shared 的订阅类型中,消费者可以单独否定确认消息。

    如果启用了批量处理,那这一批中的所有消息都会重新发送给消费者。

    3.2.1. NegativeAcksTracker

    取消确认和其他消息确认一样,不会立即请求 Broker,而是把请求转交给 NegativeAcksTracker 进行处理。Tracker 中记录着每条消息以及需要延迟的时间。Tracker 默认是 33ms 左右一个时间刻度进行检查,默认延迟时间是 1 分钟,抽取出已经到期的消息并触发重新投递。Tracker 存在的意义是为了合并请求。另外如果延迟时间还没到,消息会暂存在内存,如果业务侧有大量的消息需要延迟消费,还是建议使用 reconsumeLater 接口。NegativeAck 唯一的好处是不需要每条消息都指定时间,可以全局设置延迟时间。

    3.3. redelivery backoff 机制

    通常情况下可以使用取消确认来达到处理消息失败后重新处理消息的目的,但通过 redelivery backoff 可以更好的实现这种目的。可以通过指定消息重试的次数、消息重发的延迟来重新消费处理失败的消息。

    3.4. 确认超时

    除了取消确认和 redelivery backoff 机制外,还可以通过开启自动重传递机制来处理未确认的消息。启用自动重传递后,client 会在 ackTimeout 时间内跟踪未确认的消息,并在消息确认超时后自动向代理重新发送未确认的消息请求。

  • 如果开启了批量处理,那这批消息都会重新发送给 Consumer 。
  • 与确认超时相比,取消确认会更合适。因为取消确认能更精确地控制单个消息的再交付,并避免在消息处理时引起的超过确认超时而导致无效的再重传。
  • 3.5. 消息预拉取

    Consumer 客户端 SDK 会默认预先拉取消息到 Consumer 本地,Broker 侧会把这些已经推送到 Consumer 本地的消息记录为 pendingAck,这些消息既不会再投递给别的消费者,也不会 ack 超时,除非当前 Consumer 被关闭,消息才会被重新投递。Broker 侧有一个 RedeliveryTracker 接口,这个 Tracker 会记录消息到底被重新投递了多少次,每条消息推送给消费者时,会先从 Tracker 的哈希表中查询一下重投递的次数,和消息一并推送给消费者。

    3.6. 未确认的消息处理

    如果消息被消费者消费后一直没有确认怎么办?

    unAckedMessageTracker 中维护了一个时间轮,时间轮的刻度根据 ackTimeout 、tickDurationInMs 这两个参数生成,每个刻度时间= ackTimeout / tickDurationInMs。新追踪的消息会放入最后一个刻度,每次调度都会移除队列头第一个刻度,并新增一个刻度放入队列尾,保证刻度总数不变。每次调度,队列头刻度里的消息将会被清理,unAckedMessageTracker 会自动把这些消息做重投递。

    重投递就是客户端发送一个 redeliverUnacknowledgedMessages 命令给 Broker。每一条推送给消费者但是未 ack 的消息,在 Broker 侧都会有一个集合来记录(pengdingAck),这是用来避免重复投递的。触发重投递后,Broker 会把对应的消息从这个集合里移除,然后这些消息就可以再次被消费了。

    4. Pulsar 服务端

    Broker 是 Pulsar 的一个无状态组件,主要负责运行以下两个组件:

  • http 服务:提供为生产者和消费者管理任务和 Topic 查找的 REST API。Producer 通过连接到 Broker 来发送消息, Consumer 通过连接到 Broker 来接收消息。
  • 调度器;提供异步 http 服务,用于二进制数据的传输。
  • 4.1. 消息确认与留存

    Pulsar Broker 会默认删除已经被所有 Consumer 确认的消息,并以 backlog 的方式持久化存储所有未被确认的内消息。

    Pulsar 的 message retention(消息留存) 和 message expiry (消息过期)这两个特性可以调整 Broker 的默认设置。

  • Message retention: 保留 Consumer 已确认的消息。

    通过留存规则的设定,可以保证已经被确认且符合留存规则的消息持久地保存在 Pulsar 中,而没有被留存规则覆盖、已经被确认的消息会被删除。

  • Message expire(消息过期):设置未确认消息的存活时长(TTL)。

    通过设置消息的 TTL,有些即使还没有被确认,但已经超过 TTL 的消息,也会被删除。

  • 4.2. 消息去重

    实现消息去重的一种方式是确保消息仅生成一次,即生产者幂等。这种方式的缺点是把消息去重的工作交由应用去做。

    在 Pulsar 中, Broker 支持配置开启消息去重,用户不需要为了消息去重去调整 Producer 的代码。启用消息去重后,即使一条消息被多次发送到 Topic 上,这条消息也只会被持久化到磁盘一次。

    如下图,未开启消息去重时, Producer 发送消息 1 到 Topic 后, Broker 会把消息 1 持久化到 BookKeeper ,当 Producer 又发送消息 1 时, Broker 会把消息 1 再一次持久化到 BookKeeper 。开启消息去重后,当 Producer 再次发送消息 1 时, Broker 不会把消息 1 再一次持久化到磁盘。

    4.2.1. 去重原理

    Producer 对每一个发送的消息,都会采用递增的方式生成一个唯一的 sequenceID,这个消息会放在 message 的元数据中传递给 Broker 。同时, Broker 也会维护一个 PendingMessage 队列,当 Broker 返回发送成功 ack 后, Producer 会将 PendingMessage 队列中的对于的 Sequence ID 删除,表示 Producer 任务这个消息生产成功。Broker 会记录针对每个 Producer 接收到的最大 Sequence ID 和已经处理完的最大 Sequence ID。

    当 Broker 开启消息去重后, Broker 会对每个消息请求进行是否去重的判断。收到的最新的 Sequence ID 是否大于 Broker 端记录的两个维度的最大 Sequence ID,如果大于则不重复,如果小于或等于则消息重复。消息重复时, Broker 端会直接返回 ack,不会继续走后续的存储处理流程。

    4.3. 消息延迟传递

    延时消息功能允许 Consumer 能够在消息发送到 Topic 后过一段时间才能消费到这条消息。在这种机制中,消息在发布到 Broker 后,会被存储在 BookKeeper 中,当到消息特定的延迟时间时,消息就会传递给 Consumer 。

    下图为消息延迟传递的机制。Broker 在存储延迟消息的时候不会进行特殊的处理。当 Consumer 消费消息的时候,如果这条消息设置了延迟时间,则会把这条消息加入 DelayedDeliveryTracker 中,当到了指定的发送时间时,DelayedDeliveryTracker 才会把这条消息推送给消费者。

    4.3.1. 延迟投递原理

    在 Pulsar 中,可以通过两种方式实现延迟投递。分别为 deliverAfter 和 deliverAt。

    deliverAfter 可以指定具体的延迟时间戳,deliverAt 可以指定消息在多长时间后消费。两种方式本质时一样的,deliverAt 方式下,客户端会计算出具体的延迟时间戳发送给 Broker 。

    DelayedDeliveryTracker 会记录所有需要延迟投递的消息的 index 。index 由 Timestamp、 Ledger ID、 Entry ID 三部分组成,其中 Ledger ID 和 Entry ID 用于定位该消息,Timestamp 除了记录需要投递的时间,还用于延迟优先级队列排序。DelayedDeliveryTracker 会根据延迟时间对消息进行排序,延迟时间最短的放在前面。当 Consumer 在消费时,如果有到期的消息需要消费,则根据 DelayedDeliveryTracker index 的 Ledger ID、 Entry ID 找到对应的消息进行消费。如下图, Producer 依次投递 m1、m2、m3、m4、m5 这五条消息,m2 没有设置延迟时间,所以会被 Consumer 直接消费。m1、m3、m4、m5 在 DelayedDeliveryTracker 会根据延迟时间进行排序,并在到达延迟时间时,依次被 Consumer 进行消费。

    4.4. Bundle

    我们知道, Topic 分区会散落在不同的 Broker 中,那 Topic 分区和 Broker 的关系是如何维护的呢?当某个 Broker 负载过高时, Pulsar 怎么处理呢?

    Topic 分区与 Broker 的关联是通过 Bundle 机制进行管理的。

    每个 namespace 存在一个 Bundle 列表,在 namesapce 创建时可以指定 Bundle 的数量。Bundle 其实是一个分片机制,每个 Bundle 拥有 namespace 整个 hash 范围的一部分。每个 Topic (分区) 通过 hash 运算落到相应的 Bundle 区间,进而找到当前区间关联的 Broker 。每个 Bundle 绑定唯一的一个 Broker ,但一个 Broker 可以有多个 Bundle 。

    如下图,T1、T2 这两个 Topic 的 hash 结果落在[0x0000000L——0x4000000L]中,这个 hash 范围的 Bundle 对应 Broker 2, Broker 2 会对 T1、T2 进行处理。

    同理,T4 的 hash 结果落在[0x4000000L——0x8000000L]中,这个 hash 范围的 Bundle 对应 Broker 1, Broker 1 会对 T4 进行处理;

    T5 的 hash 结果落在[0x8000000L——0xC000000L]中,这个 hash 范围的 Bundle 对应 Broker 3, Broker 3 会对 T5 进行处理;

    T3 的 hash 结果落在[0xC000000L——0x0000000L]中,这个 hash 范围的 Bundle 对应 Broker 3, Broker 3 会对 T3 进行处理。

    Bundle 可以根据绑定的 Broker 的负载进行动态的调整、绑定。当 Bundle 绑定的 Broker 的 Topic 数过多、负载过高时,都会触发 Bundle 拆分,将原有的 Bundle 拆分成 2 个 Bundle ,并将其中一个 Bundle 重新分配给不同的 Broker ,以降低原 Broker 的 Topic 数或负载。

    5. Pulsar 存储层(Bookkeeper)

    BookKeeper 是 Pulsar 的存储组件。

    对于 Pulsar 的每个 Topic(分区),其数据并不会固定的分配在某个 Bookie 上,具体的逻辑实现我们在 Bundle 一节已经讨论过,而 Topic 的物理存储,实际上是通过 BookKeeper 组件来实现的。

    5.1. 分片存储

    概念:

  • Bookie:BookKeeper 的一部分,处理需要持久化的数据。
  • Ledger:BookKeeper 的存储逻辑单元,可用于追加写数据。
  • Entry:写入 BookKeeper 的数据实体。当批量生产时,Entry 为多条消息,当非批量生产时,Entry 为单条数据。
  • Pulsar 在物理上采用分片存储的模式,存储粒度比分区更细化、存储负载更均衡。如图,一个分区 Topic-Partition 2 的数据由多个分片组成。每个分片作为 BookKeeper 中的一个 Ledger ,均匀的分布并存储在 BookKeeper 的多个 Bookie 节点中。

    基于分配存储的机制,使得 Bookie 的扩容可以即时完成,无需任何数据复制或者迁移。当 Bookie 扩容时,Broker 可以立刻发现并感知新的 Bookie ,并尝试将新的分片 Segment 写入新增加的 Bookie 中。

    如上图,在 Broker 中,消息以 Entry 的形式追加的形式写入 Ledger 中,每个 Topic 分区都有多个非连续 ID 的 Ledger,Topic 分区的 Ledger 同一时刻只有一个处于可写状态。

    Topic 分区在存储消息时,会先找到当前使用的 Ledger ,生成 Entry ID(每个 Entry ID 在同一个 Ledger 内是递增的)。当 Ledger 的长度或 Entry 个数超过阈值时,新消息会存储到新 Ledger 中。每个 messageID 由[Ledger ID, Entry ID, Partition 编号,batch-index]组成。( Partition :消息所属的 Topic 分区,batch-index:是否为批量消息)

    一个 Ledger 会根据 Topic 指定的副本数量存储到多个 Bookie 中。一个 Bookie 可以存放多个不连续的 Ledger。

    5.2. 读写数据的流程

  • Journals :Journals 文件包含 BookKeeper 的事务日志信息。在对 Ledger 更新之前, Bookie 会保证更新的事务信息已经写入 Journals 。当 Bookie 启动或者旧的 Journals 大小达到阈值时,就会创建一个新的 Journals 。
  • Entry Logs:Entry Logs 管理从 Bookie 收到的 Entry 数据。来自不同 Ledger 的 Entry 会按顺序聚合并写入 Entry Logs ,这些 Entry 在 Entry Logs 中的偏移量会作为指针保存在 Ledger Cache 中,以便快速查找。当 Bookie 启动或者旧的 Entry Logs 大小达到阈值时,就会创建一个新的 Entry Logs 。当旧的 Entry Logs 没有与任何活跃的 Ledger 关联时,就会被垃圾回收器删除。
  • Index Files:每个 Ledger 都会创建一个 Index file,它包括一个头和几个固定长度的 Index page,这些 Index page 记录存储在 Entry Logs 中的 Entry 的偏移量。由于更新索引文件会引入随机的磁盘 I/O,所以索引文件由后台运行的同步线程延迟更新。这确保了更新的快速性能。在索引页持久化到磁盘之前,将它们聚集在 Ledger Cache 中以方便查找。
  • Ledger Cache:Ledger Cache 存放在内存池中,这样可以更高效地管理磁盘头调度。
  • 5.2.1. 消息的写入

    1.将 Entry 追加写入 Ledger 中。

    将这次 Entry 的更新操作写入 Journal 日志中,当由多个数据写入时,可以批量提交,将数据刷到 Journal 磁盘中。

    将 Entry 数据写入写缓存中。

    返回写入成功响应。

    到这里,消息写入的同步流程已经完成。

    3-A. 内存中的 Entry 数据会根据 Ledger 和写入 Ledger 的时间顺序进行排序,批量写入 Entry Log 中。

    3-B. Entry 在 Entry log 中的偏移量以 Index Page 的方式写入 Ledger Cache 中,即 iIdex Files。

    Entry Log 和 Ledger Cache 中的 Index File 会 Flush 到磁盘中。

    5.2.2. 消息的读取

    A.先从写缓存中以尾部读的方式读取。

    B.如果写缓存未命中,则从读缓存中读取。

    C.如果读缓存未命中,则从磁盘中读取。磁盘读取有三步:

    C-1.读取 Index Disk,获取 Entry 的偏移量。

    C-2.根据 Entry 的偏移量,在 Entry Disk 中快速找到 Entry 。

    C-3.将 Entry 数据写入读缓存中。

    6. 参考文献

  • Pulsar 官方文档

  • BookKeeper 官方文档

  • 【MQ Oteam】Apache Pulsar 技术系列 - 客户端消息确认

  • 【MQ Oteam】Apache Pulsar 技术系列 - Message deduplication 这里的去重与你想的可能不一样

  • 【MQ Oteam】Apache Pulsar 技术系列 - Pulsar 延迟消息投递解析

  • Apache 系列—Pulsar 核心特性解析


  • 主流消息队列Kafka和下一代云原生消息平台Pulsar架构分析

    目录

    一、消息队列发展历程

    二、为什么需要消息队列

      如果两个服务之间需要通信,最简单的方案就是直接让它俩之间建立通信就行了。但试想一下,如果所有生产数据的服务和消费数据的服务都要彼此相连,那么系统多了之后,各个系统之间的依赖关系会极其复杂。如果其中的某个系统进行一点修改,那简直是噩梦,真可谓牵一发而动全身。

      而在生产者和消费者之间引入中间件就是为了解决这个问题:消息的产生者不关心消费者是谁,它只需要把消息一股脑丢到消息队列里面,消费者会从消息队列里面消费数据。

      这就是系统设计中一个最简单实用的技巧:加中间层。没有什么问题是加一个中间层服务解决不了的,如果真解决不了,那就加两层。
      细分一下,消费模型又分两种:

        1、点对点模式,也叫队列模式。即每条消息只会被一个消费者消费。

        2、发布订阅(Pub/Sub)模式。发送到某个 Topic 的消息,会分发给所有订阅该 Topic 的消费者进行消费。

      一个成熟的消息队列应该同时支持上述两种消费模型。

      另外,你总不能让生产者的消息「阅后即焚」吧,所以消息队列应该有自己的持久化存储系统,能够把消息存储下来,方便后续的回溯、分析等操作。

      综上,消息队列在整个系统中的主要作用就是:

        1、解耦。使得服务之间的拓扑关系简单了很多。

        2、削峰/异步化。消息队列可以把大量不需要实时处理的数据暂存下来,等待消费者慢慢消费。

      当然,消息队列中间件看似消除了服务之间的互相依赖,但说到底其实是让所有服务都依赖消息队列了,如果消息队列突然坏掉,那就全完蛋了。

      所以我们对这个消息队列本身的要求就非常高,具体来说有几方面:

        1、高性能。消息队列作为整个系统的枢纽,它的性能必须足够高,否则很可能成为整个系统的性能瓶颈。

        2、高可用。说白了就是要抗揍,如果消息队列集群中的少部分节点由于种种原因去世了,也不能影响整个集群的服务。

        3、数据可靠性。在各种极端情况下(比如突然断网、突然断电),要保证已经收到的消息成功储存(一般是指落到磁盘中)。

        4、可扩展。业务是发展的,如果消息队列集群快扛不住计算压力了,就需要更多的计算资源(扩容);如果消息队列集群压力很小,导致很多节点搁那打酱油,那么需要回收计算资源(缩容)。这就需要消息队列在设计时就考虑如何进行灵活的扩缩容。

    三、Kafka架构

      首先说说 Kafka 的架构设计,producer 和 customer 可以选定 Kafka 的某些 topic 中投递和消费消息,但 topic 其实只是个逻辑概念,topic 下面分为多个 partition,消息是真正存在 partition 中的:

      每个 partition 会分配给一个 broker 节点管理:

      所谓 broker 节点,就是一个服务进程。简单来说,你把一个 broker 节点理解为一台服务器,把 partition 理解为这台服务器上的一个文件就行了。

      发到 topic 的消息实际上是发给了某个 broker 服务器,然后被持久化存储到一个文件里,我们一般称这个文件是 log file。

      那么为什么要给一个 topic 分配多个 partition 呢?

      很显然,如果一个 topic 只有一个 partition,那么也就只能有一台 broker 服务器处理这个 topic 上的消息,如果划分成很多 partition,就可以把这个 topic 上的消息分配到多台 broker 的 partition 上,每个 broker 处理消息并将消息持久化存储到 log file 中,从而提高单 topic 的数据处理能力。

      但问题是怎么保证高可用?如果某个 broker 节点挂了,对应的 partition 上的数据不就就无法访问了吗?

      一般都是通过「数据冗余」和「故障自动恢复」来保证高可用,Kafka 会对每个 partition 维护若干冗余副本:

      若干个 partition 副本中,有一个 leader 副本(图中红色的),其余都是 follower 副本(也可称为 replica,图中橙色的)。

      leader 副本负责向生产者和消费者提供订阅发布服务,负责持久化存储消息,同时还要把最新的消息同步给所有 follower,让 follower 小弟们和自己存储的数据尽可能相同。

      这样的话,如果 leader 副本挂了,就能从 follower 中选取一个副本作为新的 leader,继续对外提供服务。

      这就是 Kafka 的设计,一切看起来很完美,是吧?实际上并不完美。

    四、Kafka架构的缺陷

      1、Kafka 把 broker 和 partition 的数据存储牢牢绑定在一起,会产生很多问题。

      首先一个问题就是,Kafka 的很多操作都涉及 partition 数据的全量复制。

      比方说典型的扩容场景,假设有 broker1, broker2 两个节点,它们分别负责若干个 partition,现在我想新加入一个 broker3 节点分摊 broker1 的部分负载,那你得让 broker1 拿一些 partition 给到 broker3 对吧?

      那不好意思,得复制 partition 的全量数据,什么时候复制完,broker3 才能上线提供服务。且不说消耗 IO 以及网络资源,如果复制数据的速度小于 partition 的写入速度,那永远都别想复制完了。

      再比方说,我想给某个 partition 新增一个 follower 副本,那么这个新增的 follower 副本必须要跟 leader 副本同步全量数据。毕竟 follower 存在的目的就是随时替代 leader,所以复制 leader 的全量数据是必须的。

      除此之外,因为 broker 要负责存储,所以整个集群的容量可能局限于存储能力最差的那个 broker 节点。而且如果某些 partition 中的数据特别多(数据倾斜),那么对应 broker 的磁盘可能很快被写满,这又要涉及到 partition 的迁移,数据复制在所难免。

      虽然 Kafka 提供了现成的脚本来做这些事情,但实际需要考虑的问题比较多,操作也比较复杂,数据迁移也很耗时,远远做不到集群的「平滑扩容」。
      2、Kafka 底层依赖操作系统的 Page Cache,会产生很多问题。
    之前说了,只有数据被写到磁盘里才能保证万无一失,否则的话都不能保证数据不会丢。所以首先一个问题就是 Kafka 消息持久化并不可靠,可能丢消息。

      我们知道 Linux 文件系统会利用 Page Cache 机制优化性能。Page Cache 说白了就是读写缓存,Linux 告诉你写入成功,但实际上数据并没有真的写进磁盘里,而是写到了 Page Cache 缓存里,可能要过一会儿才会被真正写入磁盘。

      那么这里面就有一个时间差,当数据还在 Page Cache 缓存没有落盘的时候机器突然断电,缓存中的数据就会永远丢失。

      而 Kafka 底层完全依赖 Page Cache,并没有要求 Linux 强制刷盘,所以突然断电的情况是有可能导致数据丢失的。对于大部分场景来说,可以容忍偶尔丢点数据,但对于金融支付这类服务场景,是绝对不能接受丢数据的。

      另外,虽然我看到很多博客都把 Kafka 依赖 page cache 这个特性看做是 Kafka 的优点,理由是可以提升性能,但实际上 Page Cache 也是有可能出现性能问题的。

      我们来分析下消费者消费数据的情况,主要有两种可能:一种叫追尾读(Tailing Reads),一种叫追赶读(Catch-up Reads)。

      所谓追尾读,顾名思义,就是消费者的消费速度比较快,生产者刚生产一条消息,消费者立刻就把它消费了。我们可以想象一下这种情况 broker 底层是如何处理的:

      生产者写入消息,broker 把消息写入 Page Cache 写缓存,然后消费者马上就来读消息,那么 broker 就可以快速地从 Page Cache 里面读取这条消息发给消费者,这挺好,没毛病。

      所谓追赶读的场景,就是消费者的消费速度比较慢,生产者已经生产了很多新消息了,但消费者还在读取比较旧的数据。

      这种情况下,Page Cache 缓存里没有消费者想读的老数据,那么 broker 就不得不从磁盘中读取数据并存储在 Page Cache 读缓存。

      注意此时读写都依赖 Page Cache,所以读写操作可能会互相影响,对一个 partition 的大量读可能影响到写入性能,大量写也会影响读取性能,而且读写缓存会互相争用内存资源,可能造成 IO 性能抖动。

      再进一步分析,因为每个 partition 都可以理解为 broker 节点上的一个文件,那么如果 partition 的数量特别多,一个 broker 就需要同时对很多文件进行大量读写操作,这性能可就会大幅下降。
      那么,Pulsar 是如何解决 Kafka 的这些问题的呢?

    五、Pulsar 存算分离架构

      首先,Kafka broker 的扩容都会涉及 partition 数据的迁移,这是因为 Kafka 使用的是传统的单层架构,broker 需要同时进行计算(向生产者和消费者提供服务)和存储(消息的持久化)。

      怎么解决?很简单,多叠几层呗。

      Pulsar 改用多层的存算分离架构,broker 节点只负责计算,把存储的任务交给专业的存储引擎 Bookkeeper 来做:

      有了存算分离架构,Pulsar 的 partition 在 broker 之间的迁移完全不会涉及数据复制,所以可以迅速完成。

      为什么呢?你想想「文件」和「文件描述符」的区别就明白了。文件,对应磁盘上的一大块数据,移动起来比较费劲;而文件描述符可以理解为指向文件的一个指针,传递文件描述符显然要比移动文件简单得多。
    在 Kafka 中,我们可以把每个 partition 理解成一个存储消息的大文件,所以在 broker 间转移 partition 需要复制数据,异常麻烦。

      而在 Pulsar 中,我们可以把每个 partition 理解成一个文件描述符,broker 只需持有这个文件描述符即可,把对数据的处理全部甩给存储引擎 Bookkeeper 去做。

      如果 Pulsar 的某个 broker 节点的压力特别大,那你增加 broker 节点去分担一些 partition 就行;类似的,如果某个 broker 节点突然坏了,那你直接把这个 broker 节点管理的 partition 转移到别的 broker 就行了,这些操作完全不涉及数据复制。

      进一步,由于 Pulsar 中的 broker 是无状态的,所以很容易借助 k8s 这样的基础设施实现弹性扩缩容。

      经过这一波操作,broker 把数据存储的关键任务甩给了存储层,那么 Bookkeeper 是如何提供高可用、数据可靠性、高性能、可扩展的特性呢?

    六、Pulsar 节点对等架构

      Bookkeeper 本身就是一个分布式日志存储系统,先说说它是如何实现高可用的。

      Kafka 使用主从复制的方式实现高可用;而 Bookkeeper 采用 Quorum 机制实现高可用。

      Bookkeeper 集群是由若干 bookie 节点(运行着 bookie 进程的服务器)组成的,不过和 Kafka 的主从复制机制不同,这些 bookie 节点都是对等的,没有主从的关系。

      当 broker 要求 Bookkeeper 集群存储一条消息(entry)时,这条消息会被并发地同时写入多个 bookie 节点进行存储:

      还没完,之后的消息会以滚动的方式选取不同的 bookie 节点进行写入:

      这种写入方式称为「条带化写入」,既实现了数据的冗余存储,又使得数据能够均匀分布在多个 bookie 存储节点上,从而避免数据倾斜某个存储节点压力过大。

      因为节点对等,所以 bookie 节点可以进行快速故障恢复和扩容。

      比方说 entry0 ~ entry99 都成功写入到了 bookie1, bookie2, bookie3 中,写 entry100 时 bookie2 突然坏掉了,那么直接加入一个新的 bookie4 节点接替 bookie2 的工作就行了。

      那肯定有读者疑惑,新增进来的 bookie4 难道不需要先复制 bookie2 的数据吗(像 Kafka broker 节点那样)?

      主从复制的架构才需要数据复制,因为从节点必须保证和主节点完全相同,以便随时接替主节点。而节点对等的架构是不需要数据复制的。

      Bookkeeper 中维护了类似这样的一组元数据:

    [bookie1, bookie2, bookie3], 0
    [bookie1, bookie3, bookie4], 100
    

      这组元数据的含义是:entry0 ~ entry99 都写到了 bookie1, bookie2, bookie3 中,entry100 及之后的消息都写到了 bookie1, bookie3, bookie4 中。

      有了这组元数据,我们就能知道每条 entry 具体存在那些 bookie 节点里面,即便 bookie2 节点坏了,这不是还有 bookie1, bookie3 节点可以读取嘛。

      扩容也是类似的,可以直接添加新的 bookie 节点增加存储能力,完全不需要数据复制。

      对比来看,Kafka 是以 partition 为单位存储在 broker 中的:

      Bookkeeper 这边压根没有 partition 的概念,而是以 entry(消息)为单位进行存储,某个 partition 中的数据会被打散在多个 bookie 节点中:

      接下来看看 Bookkeeper 如何保证高性能和数据可靠性。

    七、Pulsar 读写隔离架构

      bookie 节点实现读写隔离,自己维护缓存,不再依赖操作系统的 Page Cache,保证了数据可靠性和高性能。

      之前说到 Kafka 完全依赖 Page Cache 产生的一些问题,而 Bookkeeper 集群中的 bookie 存储节点采用了读写隔离的架构:

      每个 bookie 节点都拥有两块磁盘,其中 Journal 磁盘专门用于写入数据,Entry Log 磁盘专门用于读取数据,而 memtable 是 bookie 节点自行维护的读写缓存。

      其中 Journal 盘的写入不依赖 Page Cache,直接强制刷盘(可配置),写入完成后 bookie 节点就会返回 ACK 写入成功。

      写 Journal 盘的同时,数据还会在 memotable 缓存中写一份,memotable 会对数据进行排序,一段时间后刷入 Entry Log 盘。

      这样不仅多了一层缓存,而且 Entry Log 盘中的数据有一定的有序性,在读取数据时可以一定程度上提高性能。

      这样设计的缺点是一份数据要存两次,消耗磁盘空间,但优势也很明显:

        1、保证可靠性,不会丢数据。因为 Journal 落盘后才判定为写入成功,那么即使机器断电,数据也不会丢失。

        2、数据读写不依赖操作系统的 Page Cache,即便读写压力较大,也可以保证稳定的性能。

        3、可以灵活配置。因为 Journal 盘的数据可以定时迁出,所以可以采用存储空间较小但写入速度快的存储设备来提高写入性能。

    八、总结

      以上介绍了 Kafka 的架构及痛点,并介绍了 Pulsar 是如何在架构层面解决 Kafka 的不足的。当然,以上只是 Pulsar 的大体设计,具体到实现必然有很多细节和难点,不是一篇文章能讲完的,后续我还会分享我的学习经验。

      Pulsar 还有很多优秀特性,比如多租户、跨地域复制等企业级特性,比如更灵活的消费模型,比如批流融合的尝试等等,这些特性可以查看 Pulsar 的官网:https://pulsar.apache.org/

    以上是关于新一代消息队列 Pulsar的主要内容,如果未能解决你的问题,请参考以下文章

    新一代消息队列 Pulsar

    新一代消息队列 Pulsar

    最火的实时计算框架Flink和下一代分布式消息队列Pulsar的批流融合

    分布式消息队列Apache Pulsar

    被视为代替Kafka的消息队列:Apache Pulsar设计简介

    Kafka与Pulsar的区别在哪?为什么会成为下一代的消息中间件之王?