详解Apache Pulsar的Topic绑定Broker

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详解Apache Pulsar的Topic绑定Broker相关的知识,希望对你有一定的参考价值。

参考技术A 众所周知,一个传统单套的Kafka集群基本上支持1000个左右的topic。

这是因为Kafka严重依赖于ZooKeeper集群。所有的broker在启动的时候都会往zookeeper进行注册,目的就是选举出一个controller,controller会读取注册上来的从节点的数据(通过监听机制),生成集群的元数据信息,之后把这些信息都分发给其他的服务器,让其他服务器能感知到集群中其它成员的存在 。

从集群规模的角度来看,限制 Kafka 集群规模的一个核心指标就是集群可承载的分区数。集群的分区数对集群的影响主要有两点:ZooKeeper 上存储的元数据量和控制器变动效率。

Kafka 集群依赖于一个单一的 Controller 节点来处理绝大多数的 ZooKeeper 读写和运维操作,并在本地缓存所有 ZooKeeper 上的元数据。分区数增加,ZooKeeper 上需要存储的元数据就会增加,从而加大 ZooKeeper 的负载,给 ZooKeeper 集群带来压力,可能导致 Watch 的延时或丢失。

当 Controller 节点出现变动时,需要进行 Leader 切换、Controller 节点重新选举等行为,分区数越多需要进行越多的 ZooKeeper 操作:比如当一个 Kafka 节点关闭的时候,Controller 需要通过写 ZooKeeper 将这个节点的所有 Leader 分区迁移到其他节点;新的 Controller 节点启动时,首先需要将所有 ZooKeeper 上的元数据读进本地缓存,分区越多,数据量越多,故障恢复耗时也就越长。

这也是新版本的Kafka抛弃Zookeeper的原因, 当然在Apache Pulsar在设计之除就考虑到了这个问题进而设计出了存算分离架构。

和一般的MQ不同,pulsar的数据不直接存储在Broker上,而是保存在最下面的BookKeeper集群中。Broker主要负责Pulsar的业务逻辑,BookKeeper只负责数据的存储。 这样也就导致Pulsar集群可以支持几十万个Topic。 由于Broker是无状态的,所以它可以很方便地实现容量的动态伸缩,另外Broker内含有一个BookKeeper Client方便与BookKeeper集群建立连接。Broker是Pulsar的逻辑中心,它是数据流和管理流的入口。 Producer选择某一Broker建立连接,生产的消息存储在该Broker管理的BookKeeper 存储节点(Bookie)。

当我们通过Pulsar客户端或者系统自动生成Topic时候需要选择Topic类型:non-partitioned或者partitioned,如果是non-partitioned类型需要选择一个broker进行绑定,反之需要根据设置的分片数选择多个broker进行绑定。

为了让Pulsar单集群可以承载几十万个Topic/Partition,Pulsar抽象了一个Bundle的概念,并且使用了 一致性哈希算法 。Topic并不会直接与Broker建立联系,让Bundle作为一致性哈希环中的虚拟节点,Topic通过名称计算哈希值,并且散列到哈希环中,找到对应的Bundle,然后通过Bundle和Broker建立连接。即ZooKeeper中保存Bundle和Broker之间的联系,Bundle归属与哪个Broker,也是通过一致性哈希动态计算出来的。

参考

https://zhuanlan.zhihu.com/p/98030096

Linux MacBook单机部署Pulsar并开启认证功能

Pulsar简单介绍

Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。 Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。

Pulsar 的关键特性如下:

  • Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。

  • 极低的发布延迟和端到端延迟。

  • 可无缝扩展到超过一百万个 topic。

  • 简单的客户端 API,支持 Java、Go、Python 和 C++。

  • 支持多种 topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。

  • 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。

  • 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。

  • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。

  • 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。

官方文档地址:https://pulsar.apache.org/docs/zh-CN/concepts-overview/

Pulsar在Linux或MacBook上安装

Pulsar是java开发的,只需要安装jdk环境

MacBook安装jdk8

Linux卸载openjdk并安装Oracle jdk

官方下载地址:https://pulsar.apache.org/zh-CN/download/

#下载pulsar
wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz

#解压
tar xvfz apache-pulsar-2.8.0-bin.tar.gz

#进入目录
cd apache-pulsar-2.8.0

#启动单机模式 Pulsar  会自动启动一个zookeeper
bin/pulsar standalone

发送和订阅数据

使用Pulsar-client 工具

#进入目录
cd apache-pulsar-2.8.0

#消费者订阅: 在first-subscription 订阅中 consume 一条消息到topic:my-topic 的命令
bin/pulsar-client consume my-topic -s "first-subscription"

#开启另一个终端发送数据,注意观察订阅者收到数据:content:hello-pulsar
#生产者发送: 向名称为 my-topic 的 topic 发送一条简单的消息 hello-pulsar
bin/pulsar-client produce my-topic --messages "hello-pulsar"

开启认证功能

#生成秘钥
bin/pulsar tokens create-secret-key --output /Users/liang/software/apache-pulsar-2.8.0/my-secret.key --base64

#创建Token,记住生成的token
bin/pulsar tokens create --secret-key file:///Users/liang/software/apache-pulsar-2.8.0/my-secret.key --subject admin
eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.UCsEs8p8E_7OiTvCuf0rRvGP26ZPFunnSEbjGsynVqM

#修改配置,开启token认证
vim conf/standalone.conf

#启用认证
authenticationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken

tokenSecretKey=file:///Users/liang/software/apache-pulsar-2.8.0/my-secret.key

#broker与broker之间的相互通信(如果不配置报错:HTTP 401 Unauthorized BookKeeper client is closed)
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.UCsEs8p8E_7OiTvCuf0rRvGP26ZPFunnSEbjGsynVqM

#重启standalone
bin/pulsar standalone

#在次执行订阅报错: Unable to authenticate 没有权限
bin/pulsar-client consume my-topic -s "first-subscription"

#执行:获取租户列表: HTTP 401 Unauthorized
bin/pulsar-admin tenants list

#修改客户端配置
vim conf/client.conf
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.UCsEs8p8E_7OiTvCuf0rRvGP26ZPFunnSEbjGsynVqM

#在次执行订阅成功
bin/pulsar-client consume my-topic -s "first-subscription"

#执行:获取租户列表成功
bin/pulsar-admin tenants list
#成功显示
"public"
"sample"

参考链接:
https://www.jianshu.com/p/dd328bdd2a32
https://blog.csdn.net/wt334502157/article/details/117414153

以上是关于详解Apache Pulsar的Topic绑定Broker的主要内容,如果未能解决你的问题,请参考以下文章

06.Apache Pulsar的JAVA API相关使用操作,基于Pulsar实现Topic的构建操作,使用JAVA如何管理租户/namespace/Topic,基于Pulsar实现数据生产/消费

Pulsar 中的 topic-partition-broker 和 segment-topic 信息保存在哪里?

在 Apache Pulsar 中使用注册模式发布到主题

Apache Pulsar 调研

Apache Pulsar MQ 学习笔记

[Pulsar系列] 10分钟学会Pulsar消息系统概念