EMQX+HStreamDB 实现物联网流数据高效持久化
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EMQX+HStreamDB 实现物联网流数据高效持久化相关的知识,希望对你有一定的参考价值。
在 IoT 场景中,通常面临设备数量庞大、数据产生速率高、累积数据量巨大等挑战。因此,如何接入、存储和处理这些海量设备数据就成为了一个关键的问题。
EMQX 作为一款强大的物联网 MQTT 消息服务器,单个集群可处理上亿设备连接,同时提供了丰富的数据集成功能。HStreamDB 作为一款分布式流数据库,不仅可以高效存储来自 EMQX 的海量设备数据,而且提供实时处理分析能力。EMQX 与 HStreamDB 都具备高可扩展性和可靠性,两者结合不仅能够满足大规模 IoT 应用的性能和稳定性需求,同时能够提升应用的实时性。
近期 EMQX Enterprise 4.4.15 发布,更新了对 HStreamDB 最新版本的支持,本文将具体介绍如何通过 EMQX 规则引擎将数据持久化到 HStreamDB,实现 MQTT 数据流的存储与实时处理。
注:本文介绍的集成步骤基于 EMQX 4.4.15 和 HStreamDB 0.14.0 以上版本。
连接到 HStreamDB 集群
在下面的教程中,我们假设有一个正在运行的 EMQX Enterprise 集群和正在运行的 HStreamDB 集群。如需部署 EMQX Enterprise 集群,请参考 EMQX Enterprise docs。如需部署 HStreamDB 集群,请参考 HStreamDB docs,其中包含关于如何用 Docker 快速部署的说明。
我们可以通过 Docker 来部署 HStreamDB 客户端并连接到 HStreamDB 集群:
# 获取帮助信息
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream --help
我们在此使用 hstream stream
命令创建一个 stream,供接下来的示例使用:
# 使用 hstream stream 命令创建 streams
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream stream create basic_condition_info_0 -r 3 -b $(( 7 * 24 * 60 * 60 ))
接下来,连接到 HStreamDB 集群,启动交互式 HStream SQL shell:
docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql --service-url "<<YOUR-SERVICE-URL>>"
# 如果要使用安全连接,还需要填写 --tls-ca, --tls-key, --tls-cert 参数
如果连接成功,将会出现
__ _________________ _________ __ ___
/ / / / ___/_ __/ __ \\/ ____/ | / |/ /
/ /_/ /\\__ \\ / / / /_/ / __/ / /| | / /|_/ /
/ __ /___/ // / / _, _/ /___/ ___ |/ / / /
/_/ /_//____//_/ /_/ |_/_____/_/ |_/_/ /_/
Command
:h To show these help info
:q To exit command line interface
:help [sql_operation] To show full usage of sql statement
SQL STATEMENTS:
To create a simplest stream:
CREATE STREAM stream_name;
To create a query select all fields from a stream:
SELECT * FROM stream_name EMIT CHANGES;
To insert values to a stream:
INSERT INTO stream_name (field1, field2) VALUES (1, 2);
可以使用 show streams;
来查看已经创建的 streams 的信息:
> show streams;
+-------------------------------------------+---------+----------------+-------------+
| Stream Name | Replica | Retention Time | Shard Count |
+-------------------------------------------+---------+----------------+-------------+
| basic_condition_info_0 | 3 | 604800 seconds | 1 |
+-------------------------------------------+---------+----------------+-------------+
创建 HStreamDB 资源
在利用 EMQX 规则引擎将数据持久化到 HStreamDB 之前,需要创建一个 HStreamDB 资源。
为此,请访问 EMQX Dashboard,单击 规则引擎
-> 资源
→ 创建
,选择 HStreamDB 资源
,输入 HStreamDB 地址并填写必要的选项。可用选项如下表:
在选择开启 SSL 时,会出现额外的 SSL 配置界面,可以粘贴所需配置内容或上传文件。
创建数据持久化到 HStreamDB 的规则
点击 规则引擎
-> 规则
-> 创建
。
编辑 SQL 规则并添加操作,您可以在字符串模板中使用 SQL 变量。
请注意,本文档中介绍的 SQL 规则仅供演示,实际的 SQL 应根据业务设计进行编写。
单击 添加操作
,选择「数据持久化」以将数据保存到 HStreamDB 中。选择上一步创建的资源并输入参数。可用参数如下表:
点击 确定
来确认添加行为。
在 HStream SQL Shell 中获取实时的数据更新
从 EMQX 规则引擎持久化到 HStreamDB 的数据可以使用 HStream SQL Shell 实时读出新写入 stream 的内容。现在,数据已经被写入 HStreamDB,可以使用任何消费方式来消费消息。文档使用了一个简单的消费方法:使用 HStream SQL shell 进行查询。此外,读者可以自由选择使用自己喜欢的编程语言 SDK 编写消费端。
# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;
当前的 select
查询没有结果可供打印出,这是因为还没有数据通过 EMQX 的规则引擎向 HStreamDB 写入。一旦有数据写入,便可以在 HStream SQL shell 观察到数据的即时更新。目前在 HStreamDB 使用 SQL 对 streams 做查询,只会打印出创建查询后的结果。如果在 EMQX 停止向 HStreamDB 写入后创建查询,可能观察不到产生的结果。
向 EMQX 写入消息测试规则引擎
可以使用跨平台的桌面客户端 MQTT X 来连接到 EMQX 并发送消息:
从 EMQX Dashboard 获取规则引擎的运行数据指标
访问对应的规则引擎界面:
如果规则引擎运行数据指标正常,则代表 EMQX 会将数据持久化到 HStreamDB。一旦写入成功,便可以在前面步骤启动的 HStream SQL Shell 中看到实时的数据更新。
# docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream:v0.14.0 hstream sql
> select * from basic_condition_info_0 emit changes;
"current-number-of-people":247.0,"device-health":true,"number-of-people-in-line":14.0,"submitter":"admin-07","temperature":27.0
"current-number-of-people":220.0,"device-health":true,"number-of-people-in-line":13.0,"submitter":"admin-07","temperature":27.2
"current-number-of-people":135.0,"device-health":true,"number-of-people-in-line":2.0,"submitter":"admin-01","temperature":26.9
"current-number-of-people":137.0,"device-health":true,"number-of-people-in-line":0.0,"submitter":"admin-01","temperature":26.9
结语
至此,我们就完成了通过 EMQX 规则引擎将数据持久化到 HStreamDB 的主要流程。
将 EMQX 采集到的数据存储到 HStreamDB 后,可以对这些数据进行实时处理与分析,为上层 AI、大数据等应用提供支撑,进一步发掘和利用数据价值。作为首个专为流数据设计的云原生流数据库,HStreamDB 与 EMQX 结合可以实现一站式存储和实时处理海量物联网数据,精简物联网应用数据栈,加速企业的物联网应用开发。
版权声明: 本文为 EMQ 原创,转载请注明出处。
原文链接:https://www.emqx.com/zh/blog/integration-practice-of-emqx-and-hstreamdb
将 EMQX Cloud 数据通过公网桥接到 AWS IoT
在物联网业务场景中,往往涉及对海量物联设备产生数据的实时提取、过滤、分拆、转换等。EMQ 推出的全托管云原生 MQTT 消息服务 EMQX Cloud 提供了高性能的内置数据集成功能,可以帮助用户实现与各种云服务(如 Kafka、MongoDB、AWS RDS、AWS DocumentDB、 AWS IoT 等)的连接,将物联网数据根据需求转存到各类第三方数据库、消息队列、数据系统中,从而简化物联网应用开发,加速业务交付。
本文将介绍如何使用 EMQX Cloud 数据集成功能通过公网桥接数据到 AWS IoT,从而借助 AWS IoT 轻松使用 AWS Lambda、Amazon Kinesis、Amazon S3、Amazon Machine Learning、Amazon DynamoDB、Amazon CloudWatch、AWS CloudTrail 和内置 Kibana 集成的 Amazon Elasticsearch Service 等 AWS 服务构建 IoT 应用程序。无需管理任何基础设施,即可实现对互连设备生成数据的收集、处理和分析等相关操作。
AWS IoT 简介
什么是 AWS IoT
Amazon IoT Core 是一种托管的云平台,让互联设备可以轻松安全地与云应用程序和其他设备交互。Amazon IoT 可以支持数十亿台设备和数万亿条消息,并能处理这些消息并将其安全可靠地路由至亚马逊云科技终端节点和其他设备。借助 Amazon IoT,您的应用程序可以随时跟踪您的所有设备并与其通信,即使这些设备未处于连接状态也不例外。
AWS IoT 平台的优势
(1)广泛而深入:AWS 拥有从边缘到云端的广泛而深入的 IoT 服务,提供本地数据收集和分析能力以及云上专为 IoT 设计的数据管理和丰富分析集成服务。
(2)多层安全性:包括预防性安全机制(如设备数据的加密和访问控制)、持续监控和审核安全配置等。
(3)卓越的 AI 集成:AWS 将 AI 和 IoT 结合在一起,使设备更为智能化。支持多种机器学习框架。
(4)大规模得到验证:AWS IoT 构建于可扩展、安全且经过验证的云基础设施之上,可扩展到数十亿种不同的设备和数万亿条消息。
使用 EMQX Cloud 桥接数据到 AWS IoT
开通 NAT 网关
在 EMQX Cloud 部署详情页面,开通增值服务 --- NAT 网关,便于公网访问到 AWS IoT。
配置 AWS IoT
- 创建事务
进入 AWS IoT 控制面板,找到管理-事务,点击创建事务,即可创建一个名为 emqx 的事务。
- 创建并下载证书
在创建好事务以后,可直接创建一个证书。
证书创建完成以后,需要在该页面下载证书,用于设备连接时的双向认证。
- 创建策略并关联到证书
找到安全-策略,创建名为 emqx-bridge 的策略,编写策略,相关配置如下。
"Version": "2012-10-17",
"Statement": [
"Effect": "Allow",
"Action": "iot:Connect",
"Resource": "arn:aws:iot:us-east-1:845523974165:client/emqx-bridge_*"
,
"Effect": "Allow",
"Action": "iot:Publish",
"Resource": "arn:aws:iot:us-east-1:845523974165:topic/emqx/bridge"
,
"Effect": "Allow",
"Action": "iot:Receive",
"Resource": "arn:aws:iot:us-east-1:845523974165:topic/emqx/bridge"
,
"Effect": "Allow",
"Action": "iot:RetainPublish",
"Resource": "arn:aws:iot:us-east-1:845523974165:topic/emqx/bridge"
,
"Effect": "Allow",
"Action": "iot:Subscribe",
"Resource": "arn:aws:iot:us-east-1:845523974165:topicfilter/emqx/bridge"
]
完成策略创建以后,需要关联到前一步创建好的证书。
- 获取 AWS IoT 的公网连接地址
在设置获取到连接地址 endpoint,用于设备连接。
配置 EMQX Cloud 数据集成
进入 EMQX Cloud 的部署页面,点击数据集成 - MQTT Bridge。
在资源页面填写 AWS IoT 的资源详细信息。
确认资源可用以后,进行规则配置,筛选并处理数据。
配置好规则以后,需要配置响应动作,即桥接数据到 AWS IoT。
在完成创建资源 - 添加规则 - 添加动作以后,可在详情页面查看相关信息。
查看已创建的规则,点击监控,可查看到目前桥接成功监控次数为 0,即初始化状态。
测试验证
- 使用 Python SDK 连接到 EMQX Cloud 部署,向主题 emqx/bridge 发送消息。
- 使用 MQTTX 连接到 AWS IoT,订阅 emqx/bridge,可以接收到来自 EMQX Cloud 部署的消息。
- 在 EMQX Cloud console 查看规则监控,可以检查桥接数据到 AWS IoT 成功与否。
结语
至此,我们完成了使用 EMQX Cloud 数据集成功能通过公网桥接数据到 AWS IoT 的全部流程。EMQX Cloud 灵活的数据集成功能,结合 AWS IoT 丰富的应用生态,用户在数分钟内即可创建一款物联网应用。
版权声明: 本文为 EMQ 原创,转载请注明出处。
原文链接:https://www.emqx.com/zh/blog/bridging-emqx-cloud-data-to-aws-iot-over-the-public-network
以上是关于EMQX+HStreamDB 实现物联网流数据高效持久化的主要内容,如果未能解决你的问题,请参考以下文章
EMQX Newsletter 2022-06|与 HStreamDB 集成充电桩通信协议 OCPP 网关开发…
将 EMQX Cloud 数据通过公网桥接到 AWS IoT