消息服务—顺带Pulsar简单介绍

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息服务—顺带Pulsar简单介绍相关的知识,希望对你有一定的参考价值。

前序系列文章>>>

标准指令集

开放消息平台主要通过 Pulsar 主动推送各种事件数据给外部合作伙伴,以满足合作伙伴对消息实时性和消息持久化的要求。

一、Pulsar

? 对于Pulsar的介绍,大家可以看一下这位大佬的简单介绍>>>pulsar-介绍。简单来说,Pulsar最初由雅虎开发,现在由 Apache 软件基金会管理,是一个支持多租户、高性能的服务器到服务器之间消息通讯的解决方案。

? 涂鸦智能基于开源的 Pulsar 系统进行了定制改进,按照涂鸦智能提供的 Pulsar SDK 可完成消息接入。

? Pulsar 作为消息代理采用了Pub/Sub(发布订阅)的设计模式。该设计模式中,生产者将消息发布到主题,然后消费者可以订阅这些主题,处理传入消息,并在处理完成时发送确认。当订阅被创建时(即使消息处理设备已断开连接)所有的消息都将被 Pulsar 保留。只有在消息处理设备确认消息被成功处理后,保留下来的消息才会被丢弃。

? 此外,一个主题可以由多个消费者订阅,并且当消费者成功处理消息时,它需要向代理发送确认,以便代理可以丢弃该消息。涂鸦智能的 Pulsar 消息分发器(Broker)为每个主题分配了多个分区,Pulsar 消息分发器将根据分区和消费者分发消息。

二、相关说明

安全:

  • 认证安全:涂鸦智能 Pulsar 消息系统针对身份认证进行了深度定制以满足高安全性要求,涂鸦智能采用动态令牌机制增强安全,开发者可忽略实现细节,基于涂鸦智能提供的 SDK 完成认证。
  • 数据安全:
    |- 传输安全:涂鸦智能 Pulsar 消息推送系统基于 SSL 传输数据。
    |- 业务安全:业务数据均采用 AES-128 加密,请求均附上签名。
    请求方式:

具体参考 ConsumerExample 代码。接入时需要提供以下三部分数据:

  • username:填写云开发平台中 API 授权密钥的 Access ID。
  • password:填写云开发平台中 API 授权密钥的 Access Secret。
  • url:根据调用的区域进行选择。
    中国区:pulsar+ssl://mqe.tuyacn.com:7285/
    美国区:pulsar+ssl://mqe.tuyaus.com:7285/
    欧洲区:pulsar+ssl://mqe.tuyaeu.com:7285/
    印度区:pulsar+ssl://mqe.tuyain.com:7285/

    开通方式:
    1. 登录 IoT 工作台 > 云开发。
  1. 创建项目。

  2. 在项目中的消息订阅模块,开通消息订阅功能。

代码示例:

String url = "";
String accessId = "";
String accessKey = "";
MqConsumer mqConsumer = MqConsumer.build()
    .serviceUrl(url)
    .accessId(accessId)
    .accessKey(accessKey)
    .maxRedeliverCount(3)
    .messageListener(new MqConsumer.IMessageListener() {
     @Override
     public void onMessageArrived(Message message) throws Exception {
       //write your own message processing logic
     }
    });
mqConsumer.start();

数据格式:

技术图片

{
   "protocol": 4,
   "pv": "2.0",
   "t": 146052438362,
"data":"4FDEE3FE59FCD76E260F7115011D65C7FD2AF59BFA4DC29E5DDF3FDA6BD5447E02F679052C34BBAAB7BB0EFEED62C760FD2AF59BFA4DC29E5DDF3FDA6BD5447EC660C816075824E004EC0123DE4FD1B638BB633A478EB2C2004EF4289276****",
   "sign": "58285279b5b5790c7d917de88b3e****"
}

数据签名:

在获得真正的data数据前,可以设置防篡改 MD5 签名校验。签名算法执行步骤如下:

  1. 将收到的 JSON 格式的每个参数(除sign和值为空外)格式化为key=val。
  2. 进行组装(使用key升序)。组装后的字符串格式示例:k1=v1||k2=v2。
  3. 添加密钥。例如:k1=v1||k2=v2...kn=vn||key。
  4. 进行整串字符串的 MD5 签名。
  5. 如果 MD5 值和收到数据的 sign 一致,则表明数据没有被篡改;否则视为被篡改。

签名校验通过后,对数据进行解密:

  1. 先对数据进行 Base64 解码。
  2. 通过 AES (ECB 模式)对 accessKey 的中间 16 位代码进行解密 ,从而得到真正的设备状态数据。

解密后数据格式如下:

{
    "devId": "002dj00118fe34d9****",
    "productKey": "开发者平台定义产品对应的产品 Key",
    "dataId":"1459168450ddfdfoiopiopi****",//全局唯一 ID, 数据上报唯一标识 ID
    "status": [
    {
      "code":"switch",
      "value":false,
      "mode":"rw",
      "t":146052438362
    },
    {
      "code":"work_mode",
      "value":"colour",
      "mode":"rw",
      "t":146052438362
    }
  ]
}


三、业务数据

协议号:协议号(不同协议号代表了不同的功能)。

技术图片

设备数据上报事件:

{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "dataId": "1459168450ddfdfoiopiopi****",
  "status": [
    {
      "数据点编码1(产品定义时对应的数据点编码)": "数据点对应的值",
      "t": 1540615024283  // 数据点状态发生时间
    },
    {
      "数据点编码2(产品定义时对应的数据点编码)": "数据点对应的值",
      "t": 1540615024283 // 数据点状态发生时间
    }
  ]
}

其他事件:

技术图片
数据示例


{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "bizCode": "online",
  "bizData": {
    "time": 146052438362
  }
}
  • 设备离线:
    bizData说明

技术图片
数据示例

  {
    "devId": "002dj00118fe34d9****",
    "productKey": "开发者平台定义产品对应的产品 Key",
    "bizCode": "offline",
    "bizData": {
      "time": 146052438362
    }
  }
  • 设备名变更
    bizData 说明

技术图片

数据示例

{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "bizCode": "nameUpdate",
  "bizData": {
    "devId": "002dj00118fe34d9****",
    "name": "new name"
  }
}
  • 设备 DP 名变更
    bizData 说明

技术图片
数据示例

{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "bizCode": "dpNameUpdate",
  "bizData": {
    "devId": "002dj00118fe34d9****",
    "name": "new name",
    "dpId": "dpId"
  }
}
  • 设备绑定
    bizData 说明

技术图片
数据示例

{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "bizCode": "bindUser",
  "bizData": {
    "devId": "002dj00118fe34d9****",
    "uuid": "06200043b4e618c1****",
    "uid":"ay1529485403390****",
    "token": "IIpQ****"
  }
}

设备移除
bizData 说明

技术图片

数据示例

{
  "devId": "002dj00118fe34d9****",
  "productKey": "开发者平台定义产品对应的产品 Key",
  "bizCode": "delete",
  "bizData": {
    "devId": "002dj00118fe34d9****",
    "uid":"ay1529485403390S****"
  }
}
  • 设备升级状态
    bizData 说明

技术图片
数据示例

{
    "bizCode":"upgradeStatus",
    "bizData": {
        "devId":"6ca8756d*****1b4ewsdn",  
        "moduleType":0,
        "upgradeStatus":2,
        "description":"升级测试",
        "oldVersion": "1.0.1",
        "newVersion": "1.0.2"         
    },
    "devId":"6ca8756d*****1b4ewsdn",
    "productKey":"vFHpaEFwu8UD****",
    "ts":1562232522192
}

以上是关于消息服务—顺带Pulsar简单介绍的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot整合Pulsar生产和消费消息 简单示例代码

Spring Boot整合Pulsar生产和消费消息 简单示例代码

主流消息队列Kafka和下一代云原生消息平台Pulsar架构分析

云原生消息队列 Pulsar 浅析

Pulsar 介绍

Pulsar介绍