Pulsar 介绍
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pulsar 介绍相关的知识,希望对你有一定的参考价值。
参考技术A 本文将介绍 pulsar 的核心功能,不会对设计细节以及与其他 MQ 进行对比Apache Pulsar 是一个多租户、高性能的发布-订阅消息中间件, Pulsar 最初由雅虎开发。从 2018 年 9 月以来,它是 Apache 基金会的顶级项目之一,由 Apache 软件基金会管理。 更多历史背景参考 。截止当前发文, 项目仓库 已有 9.7k 个 star 和 451 位 contributions,目前最新版本为 2.8.1
和大多数消息中间件一样,Pulsar 也是建立在发布订阅模式上。在这种模式中,producer 向 broker 中的 topic 发送 message。consumer 订阅这些 topic 并处理传入的消息。不同之处在于 pulsar 中的 broker 不会物理存储 message,而是交给 apache bookkeeper 单独处理,broker 只是充当一个消息路由寻址的功能
从宏观角度来看,多个 Broker 节点组成一个 Pulsar Cluster;多个 Pulsar Cluster 组成一个 Pulsar Instance,Pulsar 通过 geo-replication 支持 Pulsar Instance 内在不同的集群发送和消费消息。在 pulsar 集群中:
Pulsar 中的 broker 是一个无状态的组件,主要负责两部分:
Producer 发送 Message 到 Topic 时,除了支持 sync、async 的发送方式之外,在与 Topic 连接上, 还增加了 Excusive、Share、WaitForExclusive 三种语义:
Consumer 通过 subscription 来连接到 Topic,每次通过发送 flow permit request 给 Broker 来获取 Message,同时将 Message 放入本地维护的一个 buffer queue (默认大小为 1000)队列来缓冲,每次 consumer.receive() 调用时,将从 buffer queue 获取消息
类似的, consumer 也支持两个接收消息的方式:sync、async
Topic 在 pulsar 中被当做是一个渠道来传输 Producer 和 Conusmer 之间的消息。一个完整的 Topic 结构为:persistent|non-persistent//tenant/namespace/topic
由 4 部分构成:
每条消息在消息的日志上都有一个偏移量 (offset),Pulsar 使用 Subscrtion 来跟踪这个偏移量(offset),通过 Subscrtion 可以控制 Consumer 消费消息的方式,在 pulsar 中提供了四种消费方式: exclusive 、shared、 failover, and key_shared.
即独占模式,在独占模式下,只允许一个 consumer 连接到 Subscription 上。如果多个 consumer 使用同一个 Subscription 订阅一个 Topic ,则会发生错误
共享模式,允许多个 consumer 使用同一个 Subscription 订阅 Topic,Message 将会循环的发给多个 Consumer,
按 key 共享,与 Shared 模式相同,允许使用同一个 Subscription 订阅 Topic,不同的是 Message 在 Consumer 的消费中,具有相同 key 或 orderingKey 的 Message 只传递给同一个 Consumer。无论消息被重新传递多少次,它都会传递给同一个消费者(必须指定 orderingKey 或 key)
普通的 Topic 仅由一个 Broker 处理,但单个 Broker 处理瓶颈将限制 Topic 的最大吞吐量,Partitioned Topic 分区主题通过多 Broker 并发处理来提高了 Topic 的吞吐量.
分区主题实际上在 pulsar 中是由多个内部主题构成,每个主题归属于某一个 Broker 上,message 和对应 broker 上的内部主题路由由 pulsar 来维护
Topic1 主题有五个分区(P0 到 P4),分布在三个 Broker 上。因为分区比 Broker 多,两个 Broker 一个处理两个分区,而第三个只处理一个(同样,Pulsar 会自动处理这种分区的分布)
通常将 producer 发送 message 到 parition topic 的消息路由方式和消费者通过 Subscription 订阅 Topic 分开讨论,分区消息路由方式决定了吞吐量的高低;而 consumer 通过 subscription 订阅消费者则由应用程序业务来决定
分区主题和普通主题在 Subscription 一个 Topic 上没有区别,,因为分区仅仅决定消息从生产者发布到消费者处理和 ACK 确认之间逻辑
消息的顺序与消息路由模式和消息的 key 有关,通常需要保证具有相同 key 的消息在同一个 topic 中具有顺序性。如果消息中带有 key 属性,那么这个消息在 RoundRobinPatition 和 SinglePartition 路由模式中将具有顺序性
通过对 Pulsar 组成的一些功能模块进行简单介绍来大致了解其是如何运转的(producer->broker->consumer)、由那些组件构成的(pulsar-instance、zookeeper、bookker),以及有哪些功能特性(route mode、subscription、partition ),后面将通过源码层来深度对一些优秀的设计进行剖析
pulsar
消息服务—顺带Pulsar简单介绍
前序系列文章>>>
开放消息平台主要通过 Pulsar 主动推送各种事件数据给外部合作伙伴,以满足合作伙伴对消息实时性和消息持久化的要求。
一、Pulsar
? 对于Pulsar的介绍,大家可以看一下这位大佬的简单介绍>>>pulsar-介绍。简单来说,Pulsar最初由雅虎开发,现在由 Apache 软件基金会管理,是一个支持多租户、高性能的服务器到服务器之间消息通讯的解决方案。
? 涂鸦智能基于开源的 Pulsar 系统进行了定制改进,按照涂鸦智能提供的 Pulsar SDK 可完成消息接入。
? Pulsar 作为消息代理采用了Pub/Sub(发布订阅)的设计模式。该设计模式中,生产者将消息发布到主题,然后消费者可以订阅这些主题,处理传入消息,并在处理完成时发送确认。当订阅被创建时(即使消息处理设备已断开连接)所有的消息都将被 Pulsar 保留。只有在消息处理设备确认消息被成功处理后,保留下来的消息才会被丢弃。
? 此外,一个主题可以由多个消费者订阅,并且当消费者成功处理消息时,它需要向代理发送确认,以便代理可以丢弃该消息。涂鸦智能的 Pulsar 消息分发器(Broker)为每个主题分配了多个分区,Pulsar 消息分发器将根据分区和消费者分发消息。
二、相关说明
安全:
- 认证安全:涂鸦智能 Pulsar 消息系统针对身份认证进行了深度定制以满足高安全性要求,涂鸦智能采用动态令牌机制增强安全,开发者可忽略实现细节,基于涂鸦智能提供的 SDK 完成认证。
- 数据安全:
|- 传输安全:涂鸦智能 Pulsar 消息推送系统基于 SSL 传输数据。
|- 业务安全:业务数据均采用 AES-128 加密,请求均附上签名。
请求方式:
具体参考 ConsumerExample 代码。接入时需要提供以下三部分数据:
- username:填写云开发平台中 API 授权密钥的 Access ID。
- password:填写云开发平台中 API 授权密钥的 Access Secret。
- url:根据调用的区域进行选择。
中国区:pulsar+ssl://mqe.tuyacn.com:7285/
美国区:pulsar+ssl://mqe.tuyaus.com:7285/
欧洲区:pulsar+ssl://mqe.tuyaeu.com:7285/
印度区:pulsar+ssl://mqe.tuyain.com:7285/
开通方式:- 登录 IoT 工作台 > 云开发。
-
创建项目。
- 在项目中的消息订阅模块,开通消息订阅功能。
代码示例:
String url = "";
String accessId = "";
String accessKey = "";
MqConsumer mqConsumer = MqConsumer.build()
.serviceUrl(url)
.accessId(accessId)
.accessKey(accessKey)
.maxRedeliverCount(3)
.messageListener(new MqConsumer.IMessageListener() {
@Override
public void onMessageArrived(Message message) throws Exception {
//write your own message processing logic
}
});
mqConsumer.start();
数据格式:
{
"protocol": 4,
"pv": "2.0",
"t": 146052438362,
"data":"4FDEE3FE59FCD76E260F7115011D65C7FD2AF59BFA4DC29E5DDF3FDA6BD5447E02F679052C34BBAAB7BB0EFEED62C760FD2AF59BFA4DC29E5DDF3FDA6BD5447EC660C816075824E004EC0123DE4FD1B638BB633A478EB2C2004EF4289276****",
"sign": "58285279b5b5790c7d917de88b3e****"
}
数据签名:
在获得真正的data数据前,可以设置防篡改 MD5 签名校验。签名算法执行步骤如下:
- 将收到的 JSON 格式的每个参数(除sign和值为空外)格式化为key=val。
- 进行组装(使用key升序)。组装后的字符串格式示例:k1=v1||k2=v2。
- 添加密钥。例如:k1=v1||k2=v2...kn=vn||key。
- 进行整串字符串的 MD5 签名。
- 如果 MD5 值和收到数据的 sign 一致,则表明数据没有被篡改;否则视为被篡改。
签名校验通过后,对数据进行解密:
- 先对数据进行 Base64 解码。
- 通过 AES (ECB 模式)对 accessKey 的中间 16 位代码进行解密 ,从而得到真正的设备状态数据。
解密后数据格式如下:
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"dataId":"1459168450ddfdfoiopiopi****",//全局唯一 ID, 数据上报唯一标识 ID
"status": [
{
"code":"switch",
"value":false,
"mode":"rw",
"t":146052438362
},
{
"code":"work_mode",
"value":"colour",
"mode":"rw",
"t":146052438362
}
]
}
三、业务数据
协议号:协议号(不同协议号代表了不同的功能)。
设备数据上报事件:
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"dataId": "1459168450ddfdfoiopiopi****",
"status": [
{
"数据点编码1(产品定义时对应的数据点编码)": "数据点对应的值",
"t": 1540615024283 // 数据点状态发生时间
},
{
"数据点编码2(产品定义时对应的数据点编码)": "数据点对应的值",
"t": 1540615024283 // 数据点状态发生时间
}
]
}
其他事件:
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "online",
"bizData": {
"time": 146052438362
}
}
- 设备离线:
bizData说明
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "offline",
"bizData": {
"time": 146052438362
}
}
- 设备名变更
bizData 说明
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "nameUpdate",
"bizData": {
"devId": "002dj00118fe34d9****",
"name": "new name"
}
}
- 设备 DP 名变更
bizData 说明
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "dpNameUpdate",
"bizData": {
"devId": "002dj00118fe34d9****",
"name": "new name",
"dpId": "dpId"
}
}
- 设备绑定
bizData 说明
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "bindUser",
"bizData": {
"devId": "002dj00118fe34d9****",
"uuid": "06200043b4e618c1****",
"uid":"ay1529485403390****",
"token": "IIpQ****"
}
}
设备移除
bizData 说明
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "delete",
"bizData": {
"devId": "002dj00118fe34d9****",
"uid":"ay1529485403390S****"
}
}
- 设备升级状态
bizData 说明
数据示例
{
"bizCode":"upgradeStatus",
"bizData": {
"devId":"6ca8756d*****1b4ewsdn",
"moduleType":0,
"upgradeStatus":2,
"description":"升级测试",
"oldVersion": "1.0.1",
"newVersion": "1.0.2"
},
"devId":"6ca8756d*****1b4ewsdn",
"productKey":"vFHpaEFwu8UD****",
"ts":1562232522192
}
以上是关于Pulsar 介绍的主要内容,如果未能解决你的问题,请参考以下文章
02_Pulsar的集群架构架构基本介绍Pulsar提供的组件介绍Brokers介绍Zookeeper的元数据存储基于bookKeeper持久化存储Pulsar代理
05_Pulsar的主要组件介绍与命令使用名称空间Pulsar的topic相关操作Pulsar Topic(主题)相关操作_高级操作