05_Pulsar的主要组件介绍与命令使用名称空间Pulsar的topic相关操作Pulsar Topic(主题)相关操作_高级操作

Posted 涂作权的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了05_Pulsar的主要组件介绍与命令使用名称空间Pulsar的topic相关操作Pulsar Topic(主题)相关操作_高级操作相关的知识,希望对你有一定的参考价值。

1.5.Apache Pulsar的主要组件介绍与命令使用
1.5.1.多租户模式
1.5.1.1. 什么是多租户
1.5.1.2.Pulsar多租户的相关特征_安全性(认证和授权)
1.5.1.3.Pulsar多租户的相关特性_隔离性
1.5.1.4.Pulsar多租户的相关操作
1-获取租户列表
2-创建租户
3-获取配置
4-更新配置
5-删除租户

====================
1.5.2.Pulsar的名称空间
1.5.2.1.什么是名称空间
1.5.2.2.Pulsar NameSpace(名称空间)相关操作_基本操作
1-在指定的租户下创建名称空间
2-获取所有的名称空间列表
3-删除名称空间


1.5.2.3.Pulsar Namespace(名称空间) 相关操作_高级操作
1-获取名称空间相关的配置策略
2-配置复制集群
2.1 设置backlog quota策略
2.2-获取给定命名空间复制集群的列表


3-配置backlog quota策略
3.1 - 获取backlog quota策略
3.2 - 获取backlog quota策略
3.3 - 移除backlog quota策略


4-配置持久化策略
4.1-设置持久化策略
4.2-获取持久化策略


5-配置消息存活时间(TTL)
5.1-设置消息存活时间
5.2 - 获取消息的存活时间
5.3 - 删除消息的存活时间


6 - 配置整个名称空间中Topic的消息发送速率。
6.1-设置Topic的消息发送的速率
6.2 获取topic的消息发送速率


7-配置整个名称空间中Topic的消息收速率
7.1-设置Topic的消息接收速率
7.2 获取topic的消息接收速率


8-配置整个名称空间中Topic的复制集群的速率
8.1-设置Topic的消息复制集群的速率
8.2 获取topic的消息复制集群的速率

====================
1.5.3.Pulsar的topic相关操作
1.5.3.1.什么是Topic
1.5.3.2. Pulsar Topic(主题)相关操作_基础操作
1-创建Topic
2 - 列出当前某个名称空间下的所有Topic
3-更新Topic操作
4-删除Topic操作

====================
1.5.3.3.Pulsar Topic(主题)相关操作_高级操作
1-授权
2-获取权限
3-取消权限

1.5.Apache Pulsar的主要组件介绍与命令使用

1.5.1.多租户模式

1.5.1.1. 什么是多租户

多租户是一种架构,目的是为了让多用户环境下使用同一套程序,且保证用户间数据隔离。
简单讲:在一台服务器上运行单个应用实例,它为多个租户(客户)提供服务。

Apache Pulsar 最初诞生于雅虎,当时就是为了解决雅虎内部各个部门之间数据的协调,所以多租户特性显得至关重用,Pulsar 从诞生之日起就考虑到多租户这一特性,并在后续的实现过程中,将其不断的完善。多租户这一特性,使得各个部门之间可以共享同一份数据,不用单独部署独立的系统来操作数据,很好的保证了各部门间数据一致性的问题,同时简化维护成本

Pulsar 的多租户设计符合上述要求:

  • 使用身份验证、授权和 ACL(访问控制列表)确保其安全性
  • 为每个租户强制执行存储配额
  • 支持在运行时更改隔离机制,从而实现操作成本低和管理简单

Pulsar的多租户性质主要体现在topic的URL中, 其结构如下:

persistent://tenant/namespace/topic

从URL中可以看出tenant(租户)是topic最基本的单元(比命名空间和topic名称更为基本)

1.5.1.2.Pulsar多租户的相关特征_安全性(认证和授权)

一个多租户系统需要在租户内提供系统级别的安全性,细分来讲,主要可以归类为以下两点:

  • 租户只能访问它有权限访问的topics
  • 不允许访问它无法访问的topics

在 Pulsar 中,多租户的安全性是通过身份验证和授权机制实现的。当 client 连接到 pulsar broker 时,broker 会使用身份验证插件来验证此客户端的身份,然后为其分配一个 string 类型的 role token。role token 主要有如下作用:

  • 判断client是否有对topics进行生产和消费消息的权限
  • 管理租户属性的配置

Pulsar目前支持以下几种身份认证,同时支持自定义实现自己的身份认证程序

  • TLS客户端身份认证
  • 雅虎的身份证系统:Athenz
  • Kerberos
  • JSON Web Token认证

1.5.1.3.Pulsar多租户的相关特性_隔离性

  • 软隔离:通过磁盘配额,流量控制和限制等手段
存储:
Apache Pulsar使用Bookkeeper来作为其存储层,bookie是Bookkeeper的实例,Bookkeeper本身就是具有I/O分离(读写分离)的特征,可以很多的做好IO隔离,提升读写的效率。

同时, 不同的租户可以为不同的NameSpace配置不同的存储配额, 当租户内消息的大小达到了存储配额的限制, Pulsar会 
采取相应的措施, 例如: 阻止消息生成, 抛异常 或丢弃数据等。

Broker:
每个Borker使用的内存资源都是有上限的, 当Broker达到配置的CPU或内存使用的阈值后, Pulsar会迅速的将流量转移到负载较小的Broker处理
在生产和消费方面, Pulsar都可以进行流量控制,租户可以配置发送和接收的速率,避免出现一个客户端占用当Broker 
的所有处理资源
  • 硬隔离:物理资源隔离
    Pulsar 允许将某些租户或名称空间与特定 Broker 进行隔离。这可确保这些租户或命名空间可以充分利用该特定 Broker 上的资源。

1.5.1.4.Pulsar多租户的相关操作

  • 1-获取租户列表
[root@node1 bin]# cd /export/server/pulsar-2.8.1/bin/
[root@node1 bin]# ./pulsar-admin tenants list
"public"
"pulsar"
[root@node1 bin]#
  • 2-创建租户
[root@node1 bin]# ./pulsar-admin tenants create my-tenant
[root@node1 bin]# ./pulsar-admin tenants list
"my-tenant"
"public"
"pulsar"
[root@node1 bin]#

在创建租户时,可以使用-r或者–admin-roles标志分配管理角色。可以用逗号分隔的列表指定多个角色。

[root@node1 bin]# ./pulsar-admin tenants create my-tenant2 --admin-roles role1,role2,role3
[root@node1 bin]# ./pulsar-admin tenants list
"my-tenant"
"my-tenant2"
"public"
"pulsar"
[root@node1 bin]# ./pulsar-admin tenants create my-tenant3 -r role1
[root@node1 bin]#
  • 3-获取配置
[root@node1 bin]# ./pulsar-admin tenants get my-tenant

  "adminRoles" : [ ],
  "allowedClusters" : [ "pulsar-cluster" ]

[root@node1 bin]# ./pulsar-admin tenants get my-tenant2

  "adminRoles" : [ "role1", "role2", "role3" ],
  "allowedClusters" : [ "pulsar-cluster" ]

[root@node1 bin]#
  • 4-更新配置
[root@node1 bin]# ./pulsar-admin tenants update my-tenant
[root@node1 bin]# ./pulsar-admin tenants get my-tenant

  "adminRoles" : [ ],
  "allowedClusters" : [ "pulsar-cluster" ]

[root@node1 bin]# ./pulsar-admin tenants update my-tenant -r role1
[root@node1 bin]# ./pulsar-admin tenants get my-tenant

  "adminRoles" : [ "role1" ],
  "allowedClusters" : [ "pulsar-cluster" ]

[root@node1 bin]#
  • 5-删除租户
    注意:在删除的时候,如果库下已经有名称空间,是无法删除的,需要先删除名称空间。
[root@node1 bin]# ./pulsar-admin tenants list
"my-tenant"
"my-tenant2"
"my-tenant3"
"public"
"pulsar"
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin tenants delete my-tenant3
[root@node1 bin]# ./pulsar-admin tenants list
"my-tenant"
"my-tenant2"
"public"
"pulsar"
[root@node1 bin]# ./pulsar-admin tenants delete my-tenant
[root@node1 bin]#

1.5.2.Pulsar的名称空间

1.5.2.1.什么是名称空间

namespace是Pulsar中最基本的管理单元,在namespace这一层面,可以设置权限,调整副本设置,管理跨集群的消息复制,控制消息策略和执行关键操作。一个主题topic可以继承其所对应的namespace的属性,因此我们只需对namespace的属性进行设置,就可以一次性设置该namespace中所有主题topic的属性。

namespace有两种,分别是本地的namespace和全局的namespace:

  • 本地namespace——仅对定义它的集群可见。
  • 全局namespace——跨集群可见,可以是同一个数据中心的集群,也可以是跨地域中心的集群,这依赖于是否在namespace中设置了跨集群拷贝数据的功能。

虽然本地namespace和全局namespace的作用域不同,但是只要对他们进行适当的设置,都可以跨团队和跨组织共享。一
旦生产者获得了namespace的写入权限,那么它就可以往namespace中的所有topic主题写入数据,如果某个主题不存在,则在生产者第一次写入数据时动态创建。

1.5.2.2.Pulsar NameSpace(名称空间)相关操作_基本操作

  • 1-在指定的租户下创建名称空间
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin tenants create test-tenant
[root@node1 bin]# ./pulsar-admin tenants list
"my-tenant"
"my-tenant2"
"public"
"pulsar"
"test-tenant"
[root@node1 bin]# ./pulsar-admin namespaces create test-tenant/test-namespace
[root@node1 bin]#
  • 2-获取所有的名称空间列表
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin namespaces list test-tenant
"test-tenant/test-namespace"
[root@node1 bin]# ./pulsar-admin namespaces create test-tenant/ns1
[root@node1 bin]# ./pulsar-admin namespaces list test-tenant
"test-tenant/ns1"
"test-tenant/test-namespace"
[root@node1 bin]#
  • 3-删除名称空间
[root@node1 bin]# ./pulsar-admin namespaces list test-tenant
"test-tenant/ns1"
"test-tenant/test-namespace"
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin namespaces delete test-tenant/ns1
[root@node1 bin]# ./pulsar-admin namespaces list test-tenant
"test-tenant/test-namespace"
[root@node1 bin]#

1.5.2.3.Pulsar Namespace(名称空间) 相关操作_高级操作

  • 1-获取名称空间相关的配置策略
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin namespaces policies test-tenant/test-namespace

  "auth_policies" : 
    "namespace_auth" :  ,
    "destination_auth" :  ,
    "subscription_auth_roles" :  
  ,
  "replication_clusters" : [ "pulsar-cluster" ],
  "bundles" : 
    "boundaries" : [ "0x00000000", "0x40000000", "0x80000000", "0xc0000000", "0xffffffff" ],
    "numBundles" : 4
  ,
  "backlog_quota_map" :  ,
  "clusterDispatchRate" :  ,
  "topicDispatchRate" :  ,
  "subscriptionDispatchRate" :  ,
  "replicatorDispatchRate" :  ,
  "clusterSubscribeRate" :  ,
  "publishMaxMessageRate" :  ,
  "latency_stats_sample_rate" :  ,
  "deleted" : false,
  "encryption_required" : false,
  "subscription_auth_mode" : "None",
  "offload_threshold" : -1,
  "schema_auto_update_compatibility_strategy" : "Full",
  "schema_compatibility_strategy" : "UNDEFINED",
  "is_allow_auto_update_schema" : true,
  "schema_validation_enforced" : false,
  "subscription_types_enabled" : [ ],
  "properties" :  

[root@node1 bin]#
  • 2-配置复制集群
[root@node1 bin]# ./pulsar-admin namespaces create test-tenant/ns1
[root@node1 bin]# ./pulsar-admin namespaces get-clusters test-tenant/ns1
"pulsar-cluster"

# 2.1 设置backlog quota策略
[root@node1 bin]# ./pulsar-admin namespaces set-clusters test-tenant/ns1 --clusters cl2
12:01:55.990 [AsyncHttpClient-7-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://node1:8080/admin/v2/namespaces/test-tenant/ns1/replication] Failed to perform http post request: javax.ws.rs.ForbiddenException: HTTP 403 Forbidden
Invalid cluster id: cl2

Reason: Invalid cluster id: cl2
[root@node1 bin]#
# 2.2-获取给定命名空间复制集群的列表
[root@node1 bin]# ./pulsar-admin namespaces get-clusters test-tenant/test-namespace
"pulsar-cluster"
[root@node1 bin]#
  • 3-配置backlog quota策略
    待定配额帮助Broker在某个名称空间达到某个阈值限制时限制其带宽/存储。管理员可以设置限制,并在达到限制后采取相应的行动。

3.1 - 获取backlog quota策略

[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin namespaces set-backlog-quota --limit 10G --limitTime 36000 --policy producer_request_hold test-tenant/ns1

--policy 的值选择: 
producer_request_hold:broker暂停运行,并不再持久化生产请求负载。
producer_exception:broker抛出异常,并与客户端断开连接。
consumer_backlog_eviction:broker 丢弃积压消息。

3.2 - 获取backlog quota策略

[root@node1 bin]# ./pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
"destination_storage    BacklogQuotaImpl(limitSize=10737418240, limitTime=36000, policy=producer_request_hold)"

3.3 - 移除backlog quota策略

[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
"destination_storage    BacklogQuotaImpl(limitSize=10737418240, limitTime=36000, policy=producer_request_hold)"
[root@node1 bin]# ./pulsar-admin namespaces remove-backlog-quota test-tenant/ns1
[root@node1 bin]# ./pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
[root@node1 bin]#
  • 4-配置持久化策略
    持久化策略可以给定命名空间下topic上的所有消息配置持久等级。

4.1-设置持久化策略

[root@node1 bin]# ./pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 2 --bookkeeper-ensemble 3 --bookkeeper-write-quorum 2 --ml-mark-delete-max-rate 0 test-tenant/ns1

参数说明:
bookkeeper-ack-quorum:每个entry 在等待的 acks(有保证的副本)数量,默认值:0 
bookkeeper-ensemble:单个topic 使用的 bookie 数量,默认值:0 
bookkeeper-write-quorum:每个entry 要写入的次数,默认值:0 
ml-mark-delete-max-rate:标记-删除操作的限制速率(0表示无限制),默认值:0.0

4.2-获取持久化策略

[root@node1 bin]# ./pulsar-admin namespaces get-persistence test-tenant/ns1

  "bookkeeperEnsemble" : 3,
  "bookkeeperWriteQuorum" : 2,
  "bookkeeperAckQuorum" : 2,
  "managedLedgerMaxMarkDeleteRate" : 0.0

[root@node1 bin]#
  • 5-配置消息存活时间(TTL)
    以秒为单位

5.1-设置消息存活时间

[root@node1 bin]# ./pulsar-admin namespaces set-message-ttl --messageTTL 100 test-tenant/ns1

5.2 - 获取消息的存活时间

[root@node1 bin]# ./pulsar-admin namespaces get-message-ttl test-tenant/ns1
100

5.3 - 删除消息的存活时间

[root@node1 bin]# ./pulsar-admin namespaces get-message-ttl test-tenant/ns1
100
[root@node1 bin]# ./pulsar-admin namespaces remove-message-ttl test-tenant/ns1
[root@node1 bin]# ./pulsar-admin namespaces get-message-ttl test-tenant/ns1
null
[root@node1 bin]#

6 - 配置整个名称空间中Topic的消息发送速率。
6.1-设置Topic的消息发送的速率

[root@node1 bin]# ./pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \\
> --msg-dispatch-rate 1000 \\
> --byte-dispatch-rate 1048576 \\
> --dispatch-rate-period 1

参数说明:
--msg-dispatch-rate : 每dispatch-rate-period秒钟发送的消息数量 
--byte-dispatch-rate : 每dispatch-rate-period秒钟发送的总字节数 
--dispatch-rate-period : 设置发送的速率, 比如 1 表示 每秒钟

6.2 获取topic的消息发送速率

[root@node1 bin]# ./pulsar-admin namespaces get-dispatch-rate test-tenant/ns1

  "dispatchThrottlingRateInMsg" : 1000,
  "dispatchThrottlingRateInByte" : 1048576,
  "relativeToPublishRate" : false,
  "ratePeriodInSecond" : 1

[root@node1 bin]#

7-配置整个名称空间中Topic的消息收速率
7.1-设置Topic的消息接收速率

./pulsar-admin namespaces set-subscription-dispatch-rate test-tenant/ns1 \\
--msg-dispatch-rate 1000 \\
--byte-dispatch-rate 1048576 \\
--dispatch-rate-period 1

参数说明:
–msg-dispatch-rate: 每dispatch-rate-period秒钟接收的消息数据。
–byte-dispatch-rate: 每dispatch-rate-period秒钟接收的总字节数
–dispatch-rate-period: 设置接收的速率,比如1表示 每秒钟

7.2 获取topic的消息接收速率

[root@node1 bin]# ./pulsar-admin namespaces get-subscription-dispatch-rate test-tenant/ns1

  "dispatchThrottlingRateInMsg" : 1000,
  "dispatchThrottlingRateInByte" : 1048576,
  "relativeToPublishRate" : false,
  "ratePeriodInSecond" : 1

8-配置整个名称空间中Topic的复制集群的速率
8.1-设置Topic的消息复制集群的速率

[root@node1 bin]# ./pulsar-admin namespaces set-replicator-dispatch-rate test-tenant/ns1 \\
> --msg-dispatch-rate 1000 \\
> --byte-dispatch-rate 1048576 \\
> --dispatch-rate-period 1

参数说明:
–msg-dispatch-rate : 每dispatch-rate-period秒钟复制集群的消息数量
–byte-dispatch-rate : 每dispatch-rate-period秒钟复制集群的总字节数
–dispatch-rate-period : 设置复制集群的速率, 比如 1 表示 每秒钟

8.2 获取topic的消息复制集群的速率

[root@node1 bin]# ./pulsar-admin namespaces get-replicator-dispatch-rate test-tenant/ns1

  "dispatchThrottlingRateInMsg" : 1000,
  "dispatchThrottlingRateInByte" : 1048576,
  "relativeToPublishRate" : false,
  "ratePeriodInSecond" : 1

1.5.3.Pulsar的topic相关操作

1.5.3.1.什么是Topic

Topic,话题主题的含义, 在一个名称空间下, 可以定义多个Topic 通过Topic进行数据的分类划分, 将不同的类别的消息放置到不同Topic, 消费者也可以从不同Topic中获取到相关的消息, 是一种更细粒度的消息划分操作, 同时在Topic下可以划分为多个分片, 进行分布式的存储操作, 每个分片下还存在有副本操作, 保证数据不丢失, 当然这些分片副本更多是由bookkeeper来提供支持。

Pulsar 提供持久化与非持久化两种topic。 持久化topic是消息发布、消费的逻辑端点。 持久化topic地址的命名格式如下:

persistent://tenant/namespace/topic

非持久topic应用在仅消费实时发布消息与不需要持久化保证的应用程序。 通过这种方式,它通过删除持久消息的开销来减少消息
发布延迟。 非持久化topic地址的命名格式如下:

non-persistent://tenant/namespace/topic

1.5.3.2. Pulsar Topic(主题)相关操作_基础操作

1-创建Topic
方式一:创建一个没有分区的topic

[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin topics create persistent://test-tenant/ns1/my-topic

方式二:创建一个有分区的topic

./pulsar-admin topics create-partitioned-topic persistent://test-tenant/ns1/my-topic --partitions 4

注意:不管是有分区还是没有分区,创建topic后,如果没有任何操作,60s后pulsar会认为此topic是不活动的,会自动进行删除,以避免生成垃圾数据。

相关配置:
Brokerdeleteinactivetopicsenabenabled : 默认值为true 表示是否启动自动删除
BrokerDeleteInactiveTopicsFrequencySeconds: 默认为60s 表示检测未活动的时间

2 - 列出当前某个名称空间下的所有Topic

[root@node1 bin]# ./pulsar-admin topics list test-tenant/ns1
"persistent://test-tenant/ns1/my-topic-partition-2"
"persistent://test-tenant/ns1/my-topic-partition-3"
"persistent://test-tenant/ns1/my-topic-partition-0"
"persistent://test-tenant/ns1/my-topic-partition-1"
[root@node1 bin]#

3-更新Topic操作
我们可针对有分区的topic去更新其分区的数量。

[root@node1 bin]# ./pulsar-admin topics update-partitioned-topic persistent://test-tenant/ns1/my-topic --partitions 8
[root@node1 bin]# ./pulsar-admin topics list test-tenant/ns1
"persistent://test-tenant/ns1/my-topic-partition-2"
"persistent://test-tenant/ns1/my-topic-partition-3"
"persistent://test-tenant/ns1/my-topic-partition-4"
"persistent://test-tenant/ns1/my-topic-partition-5"
"persistent://test-tenant/ns1/my-topic-partition-6"
"persistent://test-tenant/ns1/my-topic-partition-7"
"persistent://test-tenant/ns1/my-topic-partition-0"
"persistent://test-tenant/ns1/my-topic-partition-1"
[root@node1 bin]#

4-删除Topic操作
删除没有分区的topic
准备:
./pulsar-admin namespaces create my-tenant/my-namespace (创建命名空间)
./pulsar-admin topics create persistent://my-tenant/my-namespace/my-topic (创建一个无分区的topic)
./pulsar-admin topics create-partitioned-topic persistent://my-tenant/my-namespace/my-topic2 --partitions 4 (创建一个有分区的topic)

删除没有分区的topic: 
./pulsar-admin topics delete persistent://my-tenant/my-namespace/my-topic
删除有分区的topic 
./pulsar-admin topics delete-partitioned-topic persistent://my-tenant/my-namespace/my-topic2 

1.5.3.3.Pulsar Topic(主题)相关操作_高级操作

1-授权

./pulsar-admin topics grant-permission --actions produce,consume --role application1 persistent://test-tenant/ns1/tp1

2-获取权限

pulsar-admin topics grant-permission --actions produce,consume --role application1 persistent://test- tenant/ns1/tp1 

3-取消权限

pulsar-admin topics revoke-permission --role application1 persistent://test-tenant/ns1/tp1 

  "application1": [ 
  "consume", 
  "produce" 
  ]

以上是关于05_Pulsar的主要组件介绍与命令使用名称空间Pulsar的topic相关操作Pulsar Topic(主题)相关操作_高级操作的主要内容,如果未能解决你的问题,请参考以下文章

01.pulsar基本介绍多租户模式云原生架构Segmented Streams支持跨地域复制pulsar组件介绍Pulsar IO (Connector)Pulsar与kafka的对比

07_Pulsar高级组件基本使用(Connector,Functions,事务)Function(轻量级计算流程)概念与使用Connector 连接器概念与使用,其它Connector

四万字32图,Kafka知识体系保姆级教程宝典

Python之Pulsar框架使用

四万字32图,Kafka知识体系保姆级教程宝典

Pulsar 消息概念2