消息服务—顺带Pulsar简单介绍
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息服务—顺带Pulsar简单介绍相关的知识,希望对你有一定的参考价值。
前序系列文章>>>
开放消息平台主要通过 Pulsar 主动推送各种事件数据给外部合作伙伴,以满足合作伙伴对消息实时性和消息持久化的要求。
一、Pulsar
? 对于Pulsar的介绍,大家可以看一下这位大佬的简单介绍>>>pulsar-介绍。简单来说,Pulsar最初由雅虎开发,现在由 Apache 软件基金会管理,是一个支持多租户、高性能的服务器到服务器之间消息通讯的解决方案。
? 涂鸦智能基于开源的 Pulsar 系统进行了定制改进,按照涂鸦智能提供的 Pulsar SDK 可完成消息接入。
? Pulsar 作为消息代理采用了Pub/Sub(发布订阅)的设计模式。该设计模式中,生产者将消息发布到主题,然后消费者可以订阅这些主题,处理传入消息,并在处理完成时发送确认。当订阅被创建时(即使消息处理设备已断开连接)所有的消息都将被 Pulsar 保留。只有在消息处理设备确认消息被成功处理后,保留下来的消息才会被丢弃。
? 此外,一个主题可以由多个消费者订阅,并且当消费者成功处理消息时,它需要向代理发送确认,以便代理可以丢弃该消息。涂鸦智能的 Pulsar 消息分发器(Broker)为每个主题分配了多个分区,Pulsar 消息分发器将根据分区和消费者分发消息。
二、相关说明
安全:
- 认证安全:涂鸦智能 Pulsar 消息系统针对身份认证进行了深度定制以满足高安全性要求,涂鸦智能采用动态令牌机制增强安全,开发者可忽略实现细节,基于涂鸦智能提供的 SDK 完成认证。
- 数据安全:
|- 传输安全:涂鸦智能 Pulsar 消息推送系统基于 SSL 传输数据。
|- 业务安全:业务数据均采用 AES-128 加密,请求均附上签名。
请求方式:
具体参考 ConsumerExample 代码。接入时需要提供以下三部分数据:
- username:填写云开发平台中 API 授权密钥的 Access ID。
- password:填写云开发平台中 API 授权密钥的 Access Secret。
- url:根据调用的区域进行选择。
中国区:pulsar+ssl://mqe.tuyacn.com:7285/
美国区:pulsar+ssl://mqe.tuyaus.com:7285/
欧洲区:pulsar+ssl://mqe.tuyaeu.com:7285/
印度区:pulsar+ssl://mqe.tuyain.com:7285/
开通方式:- 登录 IoT 工作台 > 云开发。
-
创建项目。
- 在项目中的消息订阅模块,开通消息订阅功能。
代码示例:
String url = "";
String accessId = "";
String accessKey = "";
MqConsumer mqConsumer = MqConsumer.build()
.serviceUrl(url)
.accessId(accessId)
.accessKey(accessKey)
.maxRedeliverCount(3)
.messageListener(new MqConsumer.IMessageListener() {
@Override
public void onMessageArrived(Message message) throws Exception {
//write your own message processing logic
}
});
mqConsumer.start();
数据格式:
{
"protocol": 4,
"pv": "2.0",
"t": 146052438362,
"data":"4FDEE3FE59FCD76E260F7115011D65C7FD2AF59BFA4DC29E5DDF3FDA6BD5447E02F679052C34BBAAB7BB0EFEED62C760FD2AF59BFA4DC29E5DDF3FDA6BD5447EC660C816075824E004EC0123DE4FD1B638BB633A478EB2C2004EF4289276****",
"sign": "58285279b5b5790c7d917de88b3e****"
}
数据签名:
在获得真正的data数据前,可以设置防篡改 MD5 签名校验。签名算法执行步骤如下:
- 将收到的 JSON 格式的每个参数(除sign和值为空外)格式化为key=val。
- 进行组装(使用key升序)。组装后的字符串格式示例:k1=v1||k2=v2。
- 添加密钥。例如:k1=v1||k2=v2...kn=vn||key。
- 进行整串字符串的 MD5 签名。
- 如果 MD5 值和收到数据的 sign 一致,则表明数据没有被篡改;否则视为被篡改。
签名校验通过后,对数据进行解密:
- 先对数据进行 Base64 解码。
- 通过 AES (ECB 模式)对 accessKey 的中间 16 位代码进行解密 ,从而得到真正的设备状态数据。
解密后数据格式如下:
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"dataId":"1459168450ddfdfoiopiopi****",//全局唯一 ID, 数据上报唯一标识 ID
"status": [
{
"code":"switch",
"value":false,
"mode":"rw",
"t":146052438362
},
{
"code":"work_mode",
"value":"colour",
"mode":"rw",
"t":146052438362
}
]
}
三、业务数据
协议号:协议号(不同协议号代表了不同的功能)。
设备数据上报事件:
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"dataId": "1459168450ddfdfoiopiopi****",
"status": [
{
"数据点编码1(产品定义时对应的数据点编码)": "数据点对应的值",
"t": 1540615024283 // 数据点状态发生时间
},
{
"数据点编码2(产品定义时对应的数据点编码)": "数据点对应的值",
"t": 1540615024283 // 数据点状态发生时间
}
]
}
其他事件:
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "online",
"bizData": {
"time": 146052438362
}
}
- 设备离线:
bizData说明
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "offline",
"bizData": {
"time": 146052438362
}
}
- 设备名变更
bizData 说明
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "nameUpdate",
"bizData": {
"devId": "002dj00118fe34d9****",
"name": "new name"
}
}
- 设备 DP 名变更
bizData 说明
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "dpNameUpdate",
"bizData": {
"devId": "002dj00118fe34d9****",
"name": "new name",
"dpId": "dpId"
}
}
- 设备绑定
bizData 说明
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "bindUser",
"bizData": {
"devId": "002dj00118fe34d9****",
"uuid": "06200043b4e618c1****",
"uid":"ay1529485403390****",
"token": "IIpQ****"
}
}
设备移除
bizData 说明
数据示例
{
"devId": "002dj00118fe34d9****",
"productKey": "开发者平台定义产品对应的产品 Key",
"bizCode": "delete",
"bizData": {
"devId": "002dj00118fe34d9****",
"uid":"ay1529485403390S****"
}
}
- 设备升级状态
bizData 说明
数据示例
{
"bizCode":"upgradeStatus",
"bizData": {
"devId":"6ca8756d*****1b4ewsdn",
"moduleType":0,
"upgradeStatus":2,
"description":"升级测试",
"oldVersion": "1.0.1",
"newVersion": "1.0.2"
},
"devId":"6ca8756d*****1b4ewsdn",
"productKey":"vFHpaEFwu8UD****",
"ts":1562232522192
}
以上是关于消息服务—顺带Pulsar简单介绍的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot整合Pulsar生产和消费消息 简单示例代码
Spring Boot整合Pulsar生产和消费消息 简单示例代码