05_Pulsar的主要组件介绍与命令使用名称空间Pulsar的topic相关操作Pulsar Topic(主题)相关操作_高级操作
Posted 涂作权的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了05_Pulsar的主要组件介绍与命令使用名称空间Pulsar的topic相关操作Pulsar Topic(主题)相关操作_高级操作相关的知识,希望对你有一定的参考价值。
1.5.Apache Pulsar的主要组件介绍与命令使用
1.5.1.多租户模式
1.5.1.1. 什么是多租户
1.5.1.2.Pulsar多租户的相关特征_安全性(认证和授权)
1.5.1.3.Pulsar多租户的相关特性_隔离性
1.5.1.4.Pulsar多租户的相关操作
1-获取租户列表
2-创建租户
3-获取配置
4-更新配置
5-删除租户
====================
1.5.2.Pulsar的名称空间
1.5.2.1.什么是名称空间
1.5.2.2.Pulsar NameSpace(名称空间)相关操作_基本操作
1-在指定的租户下创建名称空间
2-获取所有的名称空间列表
3-删除名称空间
1.5.2.3.Pulsar Namespace(名称空间) 相关操作_高级操作
1-获取名称空间相关的配置策略
2-配置复制集群
2.1 设置backlog quota策略
2.2-获取给定命名空间复制集群的列表
3-配置backlog quota策略
3.1 - 获取backlog quota策略
3.2 - 获取backlog quota策略
3.3 - 移除backlog quota策略
4-配置持久化策略
4.1-设置持久化策略
4.2-获取持久化策略
5-配置消息存活时间(TTL)
5.1-设置消息存活时间
5.2 - 获取消息的存活时间
5.3 - 删除消息的存活时间
6 - 配置整个名称空间中Topic的消息发送速率。
6.1-设置Topic的消息发送的速率
6.2 获取topic的消息发送速率
7-配置整个名称空间中Topic的消息收速率
7.1-设置Topic的消息接收速率
7.2 获取topic的消息接收速率
8-配置整个名称空间中Topic的复制集群的速率
8.1-设置Topic的消息复制集群的速率
8.2 获取topic的消息复制集群的速率
====================
1.5.3.Pulsar的topic相关操作
1.5.3.1.什么是Topic
1.5.3.2. Pulsar Topic(主题)相关操作_基础操作
1-创建Topic
2 - 列出当前某个名称空间下的所有Topic
3-更新Topic操作
4-删除Topic操作
====================
1.5.3.3.Pulsar Topic(主题)相关操作_高级操作
1-授权
2-获取权限
3-取消权限
1.5.Apache Pulsar的主要组件介绍与命令使用
1.5.1.多租户模式
1.5.1.1. 什么是多租户
多租户是一种架构,目的是为了让多用户环境下使用同一套程序,且保证用户间数据隔离。
简单讲:在一台服务器上运行单个应用实例,它为多个租户(客户)提供服务。
Apache Pulsar 最初诞生于雅虎,当时就是为了解决雅虎内部各个部门之间数据的协调,所以多租户特性显得至关重用,Pulsar 从诞生之日起就考虑到多租户这一特性,并在后续的实现过程中,将其不断的完善。多租户这一特性,使得各个部门之间可以共享同一份数据,不用单独部署独立的系统来操作数据,很好的保证了各部门间数据一致性的问题,同时简化维护成本。
Pulsar 的多租户设计符合上述要求:
- 使用身份验证、授权和 ACL(访问控制列表)确保其安全性
- 为每个租户强制执行存储配额
- 支持在运行时更改隔离机制,从而实现操作成本低和管理简单
Pulsar的多租户性质主要体现在topic的URL中, 其结构如下:
persistent://tenant/namespace/topic
从URL中可以看出tenant(租户)是topic最基本的单元(比命名空间和topic名称更为基本)
1.5.1.2.Pulsar多租户的相关特征_安全性(认证和授权)
一个多租户系统需要在租户内提供系统级别的安全性,细分来讲,主要可以归类为以下两点:
- 租户只能访问它有权限访问的topics
- 不允许访问它无法访问的topics
在 Pulsar 中,多租户的安全性是通过身份验证和授权机制实现的。当 client 连接到 pulsar broker 时,broker 会使用身份验证插件来验证此客户端的身份,然后为其分配一个 string 类型的 role token。role token 主要有如下作用:
- 判断client是否有对topics进行生产和消费消息的权限
- 管理租户属性的配置
Pulsar目前支持以下几种身份认证,同时支持自定义实现自己的身份认证程序
- TLS客户端身份认证
- 雅虎的身份证系统:Athenz
- Kerberos
- JSON Web Token认证
1.5.1.3.Pulsar多租户的相关特性_隔离性
- 软隔离:通过磁盘配额,流量控制和限制等手段
存储:
Apache Pulsar使用Bookkeeper来作为其存储层,bookie是Bookkeeper的实例,Bookkeeper本身就是具有I/O分离(读写分离)的特征,可以很多的做好IO隔离,提升读写的效率。
同时, 不同的租户可以为不同的NameSpace配置不同的存储配额, 当租户内消息的大小达到了存储配额的限制, Pulsar会
采取相应的措施, 例如: 阻止消息生成, 抛异常 或丢弃数据等。
Broker:
每个Borker使用的内存资源都是有上限的, 当Broker达到配置的CPU或内存使用的阈值后, Pulsar会迅速的将流量转移到负载较小的Broker处理
在生产和消费方面, Pulsar都可以进行流量控制,租户可以配置发送和接收的速率,避免出现一个客户端占用当Broker
的所有处理资源
- 硬隔离:物理资源隔离
Pulsar 允许将某些租户或名称空间与特定 Broker 进行隔离。这可确保这些租户或命名空间可以充分利用该特定 Broker 上的资源。
1.5.1.4.Pulsar多租户的相关操作
- 1-获取租户列表
[root@node1 bin]# cd /export/server/pulsar-2.8.1/bin/
[root@node1 bin]# ./pulsar-admin tenants list
"public"
"pulsar"
[root@node1 bin]#
- 2-创建租户
[root@node1 bin]# ./pulsar-admin tenants create my-tenant
[root@node1 bin]# ./pulsar-admin tenants list
"my-tenant"
"public"
"pulsar"
[root@node1 bin]#
在创建租户时,可以使用-r或者–admin-roles标志分配管理角色。可以用逗号分隔的列表指定多个角色。
[root@node1 bin]# ./pulsar-admin tenants create my-tenant2 --admin-roles role1,role2,role3
[root@node1 bin]# ./pulsar-admin tenants list
"my-tenant"
"my-tenant2"
"public"
"pulsar"
[root@node1 bin]# ./pulsar-admin tenants create my-tenant3 -r role1
[root@node1 bin]#
- 3-获取配置
[root@node1 bin]# ./pulsar-admin tenants get my-tenant
"adminRoles" : [ ],
"allowedClusters" : [ "pulsar-cluster" ]
[root@node1 bin]# ./pulsar-admin tenants get my-tenant2
"adminRoles" : [ "role1", "role2", "role3" ],
"allowedClusters" : [ "pulsar-cluster" ]
[root@node1 bin]#
- 4-更新配置
[root@node1 bin]# ./pulsar-admin tenants update my-tenant
[root@node1 bin]# ./pulsar-admin tenants get my-tenant
"adminRoles" : [ ],
"allowedClusters" : [ "pulsar-cluster" ]
[root@node1 bin]# ./pulsar-admin tenants update my-tenant -r role1
[root@node1 bin]# ./pulsar-admin tenants get my-tenant
"adminRoles" : [ "role1" ],
"allowedClusters" : [ "pulsar-cluster" ]
[root@node1 bin]#
- 5-删除租户
注意:在删除的时候,如果库下已经有名称空间,是无法删除的,需要先删除名称空间。
[root@node1 bin]# ./pulsar-admin tenants list
"my-tenant"
"my-tenant2"
"my-tenant3"
"public"
"pulsar"
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin tenants delete my-tenant3
[root@node1 bin]# ./pulsar-admin tenants list
"my-tenant"
"my-tenant2"
"public"
"pulsar"
[root@node1 bin]# ./pulsar-admin tenants delete my-tenant
[root@node1 bin]#
1.5.2.Pulsar的名称空间
1.5.2.1.什么是名称空间
namespace是Pulsar中最基本的管理单元,在namespace这一层面,可以设置权限,调整副本设置,管理跨集群的消息复制,控制消息策略和执行关键操作。一个主题topic可以继承其所对应的namespace的属性,因此我们只需对namespace的属性进行设置,就可以一次性设置该namespace中所有主题topic的属性。
namespace有两种,分别是本地的namespace和全局的namespace:
- 本地namespace——仅对定义它的集群可见。
- 全局namespace——跨集群可见,可以是同一个数据中心的集群,也可以是跨地域中心的集群,这依赖于是否在namespace中设置了跨集群拷贝数据的功能。
虽然本地namespace和全局namespace的作用域不同,但是只要对他们进行适当的设置,都可以跨团队和跨组织共享。一
旦生产者获得了namespace的写入权限,那么它就可以往namespace中的所有topic主题写入数据,如果某个主题不存在,则在生产者第一次写入数据时动态创建。
1.5.2.2.Pulsar NameSpace(名称空间)相关操作_基本操作
- 1-在指定的租户下创建名称空间
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin tenants create test-tenant
[root@node1 bin]# ./pulsar-admin tenants list
"my-tenant"
"my-tenant2"
"public"
"pulsar"
"test-tenant"
[root@node1 bin]# ./pulsar-admin namespaces create test-tenant/test-namespace
[root@node1 bin]#
- 2-获取所有的名称空间列表
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin namespaces list test-tenant
"test-tenant/test-namespace"
[root@node1 bin]# ./pulsar-admin namespaces create test-tenant/ns1
[root@node1 bin]# ./pulsar-admin namespaces list test-tenant
"test-tenant/ns1"
"test-tenant/test-namespace"
[root@node1 bin]#
- 3-删除名称空间
[root@node1 bin]# ./pulsar-admin namespaces list test-tenant
"test-tenant/ns1"
"test-tenant/test-namespace"
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin namespaces delete test-tenant/ns1
[root@node1 bin]# ./pulsar-admin namespaces list test-tenant
"test-tenant/test-namespace"
[root@node1 bin]#
1.5.2.3.Pulsar Namespace(名称空间) 相关操作_高级操作
- 1-获取名称空间相关的配置策略
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin namespaces policies test-tenant/test-namespace
"auth_policies" :
"namespace_auth" : ,
"destination_auth" : ,
"subscription_auth_roles" :
,
"replication_clusters" : [ "pulsar-cluster" ],
"bundles" :
"boundaries" : [ "0x00000000", "0x40000000", "0x80000000", "0xc0000000", "0xffffffff" ],
"numBundles" : 4
,
"backlog_quota_map" : ,
"clusterDispatchRate" : ,
"topicDispatchRate" : ,
"subscriptionDispatchRate" : ,
"replicatorDispatchRate" : ,
"clusterSubscribeRate" : ,
"publishMaxMessageRate" : ,
"latency_stats_sample_rate" : ,
"deleted" : false,
"encryption_required" : false,
"subscription_auth_mode" : "None",
"offload_threshold" : -1,
"schema_auto_update_compatibility_strategy" : "Full",
"schema_compatibility_strategy" : "UNDEFINED",
"is_allow_auto_update_schema" : true,
"schema_validation_enforced" : false,
"subscription_types_enabled" : [ ],
"properties" :
[root@node1 bin]#
- 2-配置复制集群
[root@node1 bin]# ./pulsar-admin namespaces create test-tenant/ns1
[root@node1 bin]# ./pulsar-admin namespaces get-clusters test-tenant/ns1
"pulsar-cluster"
# 2.1 设置backlog quota策略
[root@node1 bin]# ./pulsar-admin namespaces set-clusters test-tenant/ns1 --clusters cl2
12:01:55.990 [AsyncHttpClient-7-1] WARN org.apache.pulsar.client.admin.internal.BaseResource - [http://node1:8080/admin/v2/namespaces/test-tenant/ns1/replication] Failed to perform http post request: javax.ws.rs.ForbiddenException: HTTP 403 Forbidden
Invalid cluster id: cl2
Reason: Invalid cluster id: cl2
[root@node1 bin]#
# 2.2-获取给定命名空间复制集群的列表
[root@node1 bin]# ./pulsar-admin namespaces get-clusters test-tenant/test-namespace
"pulsar-cluster"
[root@node1 bin]#
- 3-配置backlog quota策略
待定配额帮助Broker在某个名称空间达到某个阈值限制时限制其带宽/存储。管理员可以设置限制,并在达到限制后采取相应的行动。
3.1 - 获取backlog quota策略
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin namespaces set-backlog-quota --limit 10G --limitTime 36000 --policy producer_request_hold test-tenant/ns1
--policy 的值选择:
producer_request_hold:broker暂停运行,并不再持久化生产请求负载。
producer_exception:broker抛出异常,并与客户端断开连接。
consumer_backlog_eviction:broker 丢弃积压消息。
3.2 - 获取backlog quota策略
[root@node1 bin]# ./pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
"destination_storage BacklogQuotaImpl(limitSize=10737418240, limitTime=36000, policy=producer_request_hold)"
3.3 - 移除backlog quota策略
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
"destination_storage BacklogQuotaImpl(limitSize=10737418240, limitTime=36000, policy=producer_request_hold)"
[root@node1 bin]# ./pulsar-admin namespaces remove-backlog-quota test-tenant/ns1
[root@node1 bin]# ./pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
[root@node1 bin]#
- 4-配置持久化策略
持久化策略可以给定命名空间下topic上的所有消息配置持久等级。
4.1-设置持久化策略
[root@node1 bin]# ./pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 2 --bookkeeper-ensemble 3 --bookkeeper-write-quorum 2 --ml-mark-delete-max-rate 0 test-tenant/ns1
参数说明:
bookkeeper-ack-quorum:每个entry 在等待的 acks(有保证的副本)数量,默认值:0
bookkeeper-ensemble:单个topic 使用的 bookie 数量,默认值:0
bookkeeper-write-quorum:每个entry 要写入的次数,默认值:0
ml-mark-delete-max-rate:标记-删除操作的限制速率(0表示无限制),默认值:0.0
4.2-获取持久化策略
[root@node1 bin]# ./pulsar-admin namespaces get-persistence test-tenant/ns1
"bookkeeperEnsemble" : 3,
"bookkeeperWriteQuorum" : 2,
"bookkeeperAckQuorum" : 2,
"managedLedgerMaxMarkDeleteRate" : 0.0
[root@node1 bin]#
- 5-配置消息存活时间(TTL)
以秒为单位
5.1-设置消息存活时间
[root@node1 bin]# ./pulsar-admin namespaces set-message-ttl --messageTTL 100 test-tenant/ns1
5.2 - 获取消息的存活时间
[root@node1 bin]# ./pulsar-admin namespaces get-message-ttl test-tenant/ns1
100
5.3 - 删除消息的存活时间
[root@node1 bin]# ./pulsar-admin namespaces get-message-ttl test-tenant/ns1
100
[root@node1 bin]# ./pulsar-admin namespaces remove-message-ttl test-tenant/ns1
[root@node1 bin]# ./pulsar-admin namespaces get-message-ttl test-tenant/ns1
null
[root@node1 bin]#
6 - 配置整个名称空间中Topic的消息发送速率。
6.1-设置Topic的消息发送的速率
[root@node1 bin]# ./pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \\
> --msg-dispatch-rate 1000 \\
> --byte-dispatch-rate 1048576 \\
> --dispatch-rate-period 1
参数说明:
--msg-dispatch-rate : 每dispatch-rate-period秒钟发送的消息数量
--byte-dispatch-rate : 每dispatch-rate-period秒钟发送的总字节数
--dispatch-rate-period : 设置发送的速率, 比如 1 表示 每秒钟
6.2 获取topic的消息发送速率
[root@node1 bin]# ./pulsar-admin namespaces get-dispatch-rate test-tenant/ns1
"dispatchThrottlingRateInMsg" : 1000,
"dispatchThrottlingRateInByte" : 1048576,
"relativeToPublishRate" : false,
"ratePeriodInSecond" : 1
[root@node1 bin]#
7-配置整个名称空间中Topic的消息收速率
7.1-设置Topic的消息接收速率
./pulsar-admin namespaces set-subscription-dispatch-rate test-tenant/ns1 \\
--msg-dispatch-rate 1000 \\
--byte-dispatch-rate 1048576 \\
--dispatch-rate-period 1
参数说明:
–msg-dispatch-rate: 每dispatch-rate-period秒钟接收的消息数据。
–byte-dispatch-rate: 每dispatch-rate-period秒钟接收的总字节数
–dispatch-rate-period: 设置接收的速率,比如1表示 每秒钟
7.2 获取topic的消息接收速率
[root@node1 bin]# ./pulsar-admin namespaces get-subscription-dispatch-rate test-tenant/ns1
"dispatchThrottlingRateInMsg" : 1000,
"dispatchThrottlingRateInByte" : 1048576,
"relativeToPublishRate" : false,
"ratePeriodInSecond" : 1
8-配置整个名称空间中Topic的复制集群的速率
8.1-设置Topic的消息复制集群的速率
[root@node1 bin]# ./pulsar-admin namespaces set-replicator-dispatch-rate test-tenant/ns1 \\
> --msg-dispatch-rate 1000 \\
> --byte-dispatch-rate 1048576 \\
> --dispatch-rate-period 1
参数说明:
–msg-dispatch-rate : 每dispatch-rate-period秒钟复制集群的消息数量
–byte-dispatch-rate : 每dispatch-rate-period秒钟复制集群的总字节数
–dispatch-rate-period : 设置复制集群的速率, 比如 1 表示 每秒钟
8.2 获取topic的消息复制集群的速率
[root@node1 bin]# ./pulsar-admin namespaces get-replicator-dispatch-rate test-tenant/ns1
"dispatchThrottlingRateInMsg" : 1000,
"dispatchThrottlingRateInByte" : 1048576,
"relativeToPublishRate" : false,
"ratePeriodInSecond" : 1
1.5.3.Pulsar的topic相关操作
1.5.3.1.什么是Topic
Topic,话题主题的含义, 在一个名称空间下, 可以定义多个Topic 通过Topic进行数据的分类划分, 将不同的类别的消息放置到不同Topic, 消费者也可以从不同Topic中获取到相关的消息, 是一种更细粒度的消息划分操作, 同时在Topic下可以划分为多个分片, 进行分布式的存储操作, 每个分片下还存在有副本操作, 保证数据不丢失, 当然这些分片副本更多是由bookkeeper来提供支持。
Pulsar 提供持久化与非持久化两种topic。 持久化topic是消息发布、消费的逻辑端点。 持久化topic地址的命名格式如下:
persistent://tenant/namespace/topic
非持久topic应用在仅消费实时发布消息与不需要持久化保证的应用程序。 通过这种方式,它通过删除持久消息的开销来减少消息
发布延迟。 非持久化topic地址的命名格式如下:
non-persistent://tenant/namespace/topic
1.5.3.2. Pulsar Topic(主题)相关操作_基础操作
1-创建Topic
方式一:创建一个没有分区的topic
[root@node1 bin]# pwd
/export/server/pulsar-2.8.1/bin
[root@node1 bin]# ./pulsar-admin topics create persistent://test-tenant/ns1/my-topic
方式二:创建一个有分区的topic
./pulsar-admin topics create-partitioned-topic persistent://test-tenant/ns1/my-topic --partitions 4
注意:不管是有分区还是没有分区,创建topic后,如果没有任何操作,60s后pulsar会认为此topic是不活动的,会自动进行删除,以避免生成垃圾数据。
相关配置:
Brokerdeleteinactivetopicsenabenabled : 默认值为true 表示是否启动自动删除
BrokerDeleteInactiveTopicsFrequencySeconds: 默认为60s 表示检测未活动的时间
2 - 列出当前某个名称空间下的所有Topic
[root@node1 bin]# ./pulsar-admin topics list test-tenant/ns1
"persistent://test-tenant/ns1/my-topic-partition-2"
"persistent://test-tenant/ns1/my-topic-partition-3"
"persistent://test-tenant/ns1/my-topic-partition-0"
"persistent://test-tenant/ns1/my-topic-partition-1"
[root@node1 bin]#
3-更新Topic操作
我们可针对有分区的topic去更新其分区的数量。
[root@node1 bin]# ./pulsar-admin topics update-partitioned-topic persistent://test-tenant/ns1/my-topic --partitions 8
[root@node1 bin]# ./pulsar-admin topics list test-tenant/ns1
"persistent://test-tenant/ns1/my-topic-partition-2"
"persistent://test-tenant/ns1/my-topic-partition-3"
"persistent://test-tenant/ns1/my-topic-partition-4"
"persistent://test-tenant/ns1/my-topic-partition-5"
"persistent://test-tenant/ns1/my-topic-partition-6"
"persistent://test-tenant/ns1/my-topic-partition-7"
"persistent://test-tenant/ns1/my-topic-partition-0"
"persistent://test-tenant/ns1/my-topic-partition-1"
[root@node1 bin]#
4-删除Topic操作
删除没有分区的topic
准备:
./pulsar-admin namespaces create my-tenant/my-namespace (创建命名空间)
./pulsar-admin topics create persistent://my-tenant/my-namespace/my-topic (创建一个无分区的topic)
./pulsar-admin topics create-partitioned-topic persistent://my-tenant/my-namespace/my-topic2 --partitions 4 (创建一个有分区的topic)
删除没有分区的topic:
./pulsar-admin topics delete persistent://my-tenant/my-namespace/my-topic
删除有分区的topic
./pulsar-admin topics delete-partitioned-topic persistent://my-tenant/my-namespace/my-topic2
1.5.3.3.Pulsar Topic(主题)相关操作_高级操作
1-授权
./pulsar-admin topics grant-permission --actions produce,consume --role application1 persistent://test-tenant/ns1/tp1
2-获取权限
pulsar-admin topics grant-permission --actions produce,consume --role application1 persistent://test- tenant/ns1/tp1
3-取消权限
pulsar-admin topics revoke-permission --role application1 persistent://test-tenant/ns1/tp1
"application1": [
"consume",
"produce"
]
以上是关于05_Pulsar的主要组件介绍与命令使用名称空间Pulsar的topic相关操作Pulsar Topic(主题)相关操作_高级操作的主要内容,如果未能解决你的问题,请参考以下文章
01.pulsar基本介绍多租户模式云原生架构Segmented Streams支持跨地域复制pulsar组件介绍Pulsar IO (Connector)Pulsar与kafka的对比
07_Pulsar高级组件基本使用(Connector,Functions,事务)Function(轻量级计算流程)概念与使用Connector 连接器概念与使用,其它Connector