Pulsar 介绍

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pulsar 介绍相关的知识,希望对你有一定的参考价值。

参考技术A 本文将介绍 pulsar 的核心功能,不会对设计细节以及与其他 MQ 进行对比

Apache Pulsar 是一个多租户、高性能的发布-订阅消息中间件, Pulsar 最初由雅虎开发。从 2018 年 9 月以来,它是 Apache 基金会的顶级项目之一,由 Apache 软件基金会管理。 更多历史背景参考 。截止当前发文, 项目仓库 已有 9.7k 个 star 和 451 位 contributions,目前最新版本为 2.8.1

和大多数消息中间件一样,Pulsar 也是建立在发布订阅模式上。在这种模式中,producer 向 broker 中的 topic 发送 message。consumer 订阅这些 topic 并处理传入的消息。不同之处在于 pulsar 中的 broker 不会物理存储 message,而是交给 apache bookkeeper 单独处理,broker 只是充当一个消息路由寻址的功能

从宏观角度来看,多个 Broker 节点组成一个 Pulsar Cluster;多个 Pulsar Cluster 组成一个 Pulsar Instance,Pulsar 通过 geo-replication 支持 Pulsar Instance 内在不同的集群发送和消费消息。在 pulsar 集群中:

Pulsar 中的 broker 是一个无状态的组件,主要负责两部分:

Producer 发送 Message 到 Topic 时,除了支持 sync、async 的发送方式之外,在与 Topic 连接上, 还增加了 Excusive、Share、WaitForExclusive 三种语义:

Consumer 通过 subscription 来连接到 Topic,每次通过发送 flow permit request 给 Broker 来获取 Message,同时将 Message 放入本地维护的一个 buffer queue (默认大小为 1000)队列来缓冲,每次 consumer.receive() 调用时,将从 buffer queue 获取消息

类似的, consumer 也支持两个接收消息的方式:sync、async

Topic 在 pulsar 中被当做是一个渠道来传输 Producer 和 Conusmer 之间的消息。一个完整的 Topic 结构为:persistent|non-persistent//tenant/namespace/topic
由 4 部分构成:

每条消息在消息的日志上都有一个偏移量 (offset),Pulsar 使用 Subscrtion 来跟踪这个偏移量(offset),通过 Subscrtion 可以控制 Consumer 消费消息的方式,在 pulsar 中提供了四种消费方式: exclusive 、shared、 failover, and key_shared.

即独占模式,在独占模式下,只允许一个 consumer 连接到 Subscription 上。如果多个 consumer 使用同一个 Subscription 订阅一个 Topic ,则会发生错误

共享模式,允许多个 consumer 使用同一个 Subscription 订阅 Topic,Message 将会循环的发给多个 Consumer,

按 key 共享,与 Shared 模式相同,允许使用同一个 Subscription 订阅 Topic,不同的是 Message 在 Consumer 的消费中,具有相同 key 或 orderingKey 的 Message 只传递给同一个 Consumer。无论消息被重新传递多少次,它都会传递给同一个消费者(必须指定 orderingKey 或 key)

普通的 Topic 仅由一个 Broker 处理,但单个 Broker 处理瓶颈将限制 Topic 的最大吞吐量,Partitioned Topic 分区主题通过多 Broker 并发处理来提高了 Topic 的吞吐量.

分区主题实际上在 pulsar 中是由多个内部主题构成,每个主题归属于某一个 Broker 上,message 和对应 broker 上的内部主题路由由 pulsar 来维护

Topic1 主题有五个分区(P0 到 P4),分布在三个 Broker 上。因为分区比 Broker 多,两个 Broker 一个处理两个分区,而第三个只处理一个(同样,Pulsar 会自动处理这种分区的分布)

通常将 producer 发送 message 到 parition topic 的消息路由方式和消费者通过 Subscription 订阅 Topic 分开讨论,分区消息路由方式决定了吞吐量的高低;而 consumer 通过 subscription 订阅消费者则由应用程序业务来决定

分区主题和普通主题在 Subscription 一个 Topic 上没有区别,,因为分区仅仅决定消息从生产者发布到消费者处理和 ACK 确认之间逻辑

消息的顺序与消息路由模式和消息的 key 有关,通常需要保证具有相同 key 的消息在同一个 topic 中具有顺序性。如果消息中带有 key 属性,那么这个消息在 RoundRobinPatition 和 SinglePartition 路由模式中将具有顺序性

通过对 Pulsar 组成的一些功能模块进行简单介绍来大致了解其是如何运转的(producer->broker->consumer)、由那些组件构成的(pulsar-instance、zookeeper、bookker),以及有哪些功能特性(route mode、subscription、partition ),后面将通过源码层来深度对一些优秀的设计进行剖析

pulsar

消息服务—顺带Pulsar简单介绍

前序系列文章>>>

标准指令集

开放消息平台主要通过 Pulsar 主动推送各种事件数据给外部合作伙伴,以满足合作伙伴对消息实时性和消息持久化的要求。

一、Pulsar

? 对于Pulsar的介绍,大家可以看一下这位大佬的简单介绍>>>pulsar-介绍。简单来说,Pulsar最初由雅虎开发,现在由 Apache 软件基金会管理,是一个支持多租户、高性能的服务器到服务器之间消息通讯的解决方案。

? 涂鸦智能基于开源的 Pulsar 系统进行了定制改进,按照涂鸦智能提供的 Pulsar SDK 可完成消息接入。

? Pulsar 作为消息代理采用了Pub/Sub(发布订阅)的设计模式。该设计模式中,生产者将消息发布到主题,然后消费者可以订阅这些主题,处理传入消息,并在处理完成时发送确认。当订阅被创建时(即使消息处理设备已断开连接)所有的消息都将被 Pulsar 保留。只有在消息处理设备确认消息被成功处理后,保留下来的消息才会被丢弃。

? 此外,一个主题可以由多个消费者订阅,并且当消费者成功处理消息时,它需要向代理发送确认,以便代理可以丢弃该消息。涂鸦智能的 Pulsar 消息分发器(Broker)为每个主题分配了多个分区,Pulsar 消息分发器将根据分区和消费者分发消息。

二、相关说明

安全:

  • 认证安全:涂鸦智能 Pulsar 消息系统针对身份认证进行了深度定制以满足高安全性要求,涂鸦智能采用动态令牌机制增强安全,开发者可忽略实现细节,基于涂鸦智能提供的 SDK 完成认证。
  • 数据安全:
    |- 传输安全:涂鸦智能 Pulsar 消息推送系统基于 SSL 传输数据。
    |- 业务安全:业务数据均采用 AES-128 加密,请求均附上签名。
    请求方式:

具体参考 ConsumerExample 代码。接入时需要提供以下三部分数据:

  • username:填写云开发平台中 API 授权密钥的 Access ID。
  • password:填写云开发平台中 API 授权密钥的 Access Secret。
  • url:根据调用的区域进行选择。
    中国区:pulsar+ssl://mqe.tuyacn.com:7285/
    美国区:pulsar+ssl://mqe.tuyaus.com:7285/
    欧洲区:pulsar+ssl://mqe.tuyaeu.com:7285/
    印度区:pulsar+ssl://mqe.tuyain.com:7285/

    开通方式:
    1. 登录 IoT 工作台 > 云开发。
  1. 创建项目。

  2. 在项目中的消息订阅模块,开通消息订阅功能。

代码示例:

String url = "";
String accessId = "";
String accessKey = "";
MqConsumer mqConsumer = MqConsumer.build()
    .serviceUrl(url)
    .accessId(accessId)
    .accessKey(accessKey)
    .maxRedeliverCount(3)
    .messageListener(new MqConsumer.IMessageListener() {
     @Override
     public void onMessageArrived(Message message) throws Exception {
       //write your own message processing logic
     }
    });
mqConsumer.start();

数据格式:

技术图片

{
   "protocol": 4,
   "pv": "2.0",
   "t": 146052438362,
"data":"4FDEE3FE59FCD76E260F7115011D65C7FD2AF59BFA4DC29E5DDF3FDA6BD5447E02F679052C34BBAAB7BB0EFEED62C760FD2AF59BFA4DC29E5DDF3FDA6BD5447EC660C816075824E004EC0123DE4FD1B638BB633A478EB2C2004EF4289276****",
   "sign": "58285279b5b5790c7d917de88b3e****"
}

数据签名:

在获得真正的data数据前,可以设置防篡改 MD5 签名校验。签名算法执行步骤如下:

  1. 将收到的 JSON 格式的每个参数(除sign和值为空外)格式化为key=val。
  2. 进行组装(使用key升序)。组装后的字符串格式示例:k1=v1||k2=v2。
  3. 添加密钥。例如:k1=v1||k2=v2...kn=vn||key。
  4. 进行整串字符串的 MD5 签名。
  5. 如果 MD5 值和收到数据的 sign 一致,则表明数据没有被篡改;否则视为被篡改。

签名校验通过后,对数据进行解密:

  1. 先对数据进行 Base64 解码。
  2. 通过 AES (ECB 模式)对 accessKey 的中间 16 位代码进行解密 ,从而得到真正的设备状态数据。

解密后数据格式如下:

{
    "devId": "002dj00118fe34d9****",
    "productKey": "开发者平台定义产品对应的产品 Key",
    "dataId":"1459168450ddfdfoiopiopi****",//全局唯一 ID, 数据上报唯一标识 ID
    "status": [
    {
      "code":"switch",
      "value":false,
      "mode":"rw",
      "t":146052438362
    },
    {
      "code":"work_mode",
      "value":"colour",
      "mode":"rw",
      "t":146052438362
    }
  ]
}


三、业务数据

协议号:协议号(不同协议号代表了不同的功能)。

技术图片

设备数据上报事件:

{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "dataId": "1459168450ddfdfoiopiopi****",
  "status": [
    {
      "数据点编码1(产品定义时对应的数据点编码)": "数据点对应的值",
      "t": 1540615024283  // 数据点状态发生时间
    },
    {
      "数据点编码2(产品定义时对应的数据点编码)": "数据点对应的值",
      "t": 1540615024283 // 数据点状态发生时间
    }
  ]
}

其他事件:

技术图片
数据示例


{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "bizCode": "online",
  "bizData": {
    "time": 146052438362
  }
}
  • 设备离线:
    bizData说明

技术图片
数据示例

  {
    "devId": "002dj00118fe34d9****",
    "productKey": "开发者平台定义产品对应的产品 Key",
    "bizCode": "offline",
    "bizData": {
      "time": 146052438362
    }
  }
  • 设备名变更
    bizData 说明

技术图片

数据示例

{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "bizCode": "nameUpdate",
  "bizData": {
    "devId": "002dj00118fe34d9****",
    "name": "new name"
  }
}
  • 设备 DP 名变更
    bizData 说明

技术图片
数据示例

{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "bizCode": "dpNameUpdate",
  "bizData": {
    "devId": "002dj00118fe34d9****",
    "name": "new name",
    "dpId": "dpId"
  }
}
  • 设备绑定
    bizData 说明

技术图片
数据示例

{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "bizCode": "bindUser",
  "bizData": {
    "devId": "002dj00118fe34d9****",
    "uuid": "06200043b4e618c1****",
    "uid":"ay1529485403390****",
    "token": "IIpQ****"
  }
}

设备移除
bizData 说明

技术图片

数据示例

{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "bizCode": "delete",
  "bizData": {
    "devId": "002dj00118fe34d9****",
    "uid":"ay1529485403390S****"
  }
}
  • 设备升级状态
    bizData 说明

技术图片
数据示例

{
    "bizCode":"upgradeStatus",
    "bizData": {
        "devId":"6ca8756d*****1b4ewsdn",  
        "moduleType":0,
        "upgradeStatus":2,
        "description":"升级测试",
        "oldVersion": "1.0.1",
        "newVersion": "1.0.2"         
    },
    "devId":"6ca8756d*****1b4ewsdn",
    "productKey":"vFHpaEFwu8UD****",
    "ts":1562232522192
}

以上是关于Pulsar 介绍的主要内容,如果未能解决你的问题,请参考以下文章

02_Pulsar的集群架构架构基本介绍Pulsar提供的组件介绍Brokers介绍Zookeeper的元数据存储基于bookKeeper持久化存储Pulsar代理

消息服务—顺带Pulsar简单介绍

05_Pulsar的主要组件介绍与命令使用名称空间Pulsar的topic相关操作Pulsar Topic(主题)相关操作_高级操作

Python之Pulsar框架使用

Pulsar 负载均衡设计

Apache Pulsar 之企业级特性-多租户介绍