EC600 QuecPython接入第三方MQTT服务器, 以阿里云物联网平台为例,可替换为自己创建的MQTT服务器
Posted Mark_md
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EC600 QuecPython接入第三方MQTT服务器, 以阿里云物联网平台为例,可替换为自己创建的MQTT服务器相关的知识,希望对你有一定的参考价值。
EC600 QuecPython 自带Aliyun
、腾讯云
物联网平台的接入库,但重口难调,产品项目中更多的是接入自己搭建的第三方服务器。
MQTT
协议接入不同的服务器,最大的区别在于connect
报文。
connect
报文的载荷中,包含了设备的登录账号,而每个平台都有自己的设备管理方式,也就造成编码方式的不一致。不过好在MQTT
也规定了 clientId、userName、passWord
这几个关键载荷,用户只要保证这几个部分与平台匹配即可连接。
使用 EC600 QuecPython 接入第三方MQTT服务器,需要使用 umqtt
模块。
这里为方便测试,使用阿里云账号,不过不用 aliyun
的MQTT接入库,直接用 umqtt
模块。
- EC600 umqtt 使用帮助:QuecPython - 在线API文档
- 本篇demo使用的测试设备账号、以及密码的合成方式:网络调试助手接入阿里云MQTT物联网平台,逐字节讲解各字段合成方式
- 阿里云物联网平台的设备添加:阿里云物联网平台注册、添加产品、设备
上述的设备账号会一直保留,方便测试,各位可放心使用。
EC600 MQTT脚本
接入第三方MQTT服务器,需要根据自己的设备及服务器,自行计算并替换:clientId、userName、passWord、brokerUrl
以及 topic
。(阿里云的 clientI、userName、passWord
计算方式见上述链接)
Demo代码:
from machine import Pin
import log
import checkNet
from umqtt import MQTTClient
# 使用提前计算好的密码,可以登录连接。hamc库暂时使用有问题,待官方的固件库更新
# 使用umqtt库接入阿里云物联网平台,模拟接入第三方MQTT服务器进行测试
clientId = 'co_0001|securemode=3,signmethod=hmacsha1|'
userName ='co_0001&a1wFylTxYeD'
passWord = 'e782b5e55b37655c27812a60c307b0a7575d8f6d'
brokerUrl = 'a1wFylTxYeD.iot-as-mqtt.cn-shanghai.aliyuncs.com'
topic_post = b'/sys/a1wFylTxYeD/co_0001/thing/event/property/post'
topic_set = b'/sys/a1wFylTxYeD/co_0001/thing/service/property/set'
PROJECT_NAME = "MQTT_example"
PROJECT_VERSION = "1.0.0"
# 检查网络状态,创建checkNet对象
checknet = checkNet.CheckNetwork(PROJECT_NAME, PROJECT_VERSION)
# 设置日志输出级别,仅输出等级高于INFO的日志结果
log.basicConfig(level=log.INFO)
log = log.getLogger("MQTT")
state = 0
# V1.2 Demo板上的led - D6
blink = False
led = Pin(Pin.GPIO24, Pin.OUT, Pin.PULL_DISABLE, 0)
# 设置MQTT接收消息回调
def sub_cb(topic, msg):
global state
log.info("Subscribe Recv: Topic={},Msg={}".format(topic.decode(), msg.decode()))
state = 1
# 收到下发的消息时,LED亮灭状态会发生变化
global blink
blink = bool(1-blink)
led.write(blink)
if __name__ == '__main__':
# 上电运行后调试输出 项目名称、项目版本号、固件版本号、开机原因、SIM卡状态
checknet.poweron_print_once()
print('Hardware PowerON!')
# 开机后LED亮起,指示状态。
led.write(1)
# 阻塞等待网络就绪,超时等待30s
stagecode, subcode = checknet.wait_network_connected(30)
if stagecode == 3 and subcode == 1:
# 网络准备就绪,开始执行用户代码
log.info('Network connection successful! Then connect to Aliyun MQTT')
# 指示LED灭掉,提示网络连接正常。
led.write(0)
print('username:', userName)
print('password:', passWord)
# 创建一个mqtt实例
c = MQTTClient(
client_id=clientId,
server=brokerUrl,
port=1883,
user=userName,
password=passWord,
keepalive=60)
# 设置消息回调
c.set_callback(sub_cb)
# 建立连接
try:
c.connect()
except Exception as e:
print('!!!,e=%s' % e)
# 正常连接后,输出消息
log.info('Aliyun MQTT connected , Then subscribe topic')
# 订阅主题
c.subscribe(topic_post)
log.info('subscribe topic: %s' % topic_post)
c.subscribe(topic_set)
log.info('subscribe topic: %s' % topic_set)
# 发布消息
c.publish(topic_post, 'test publish')
log.info('Publish topic: %s, Msg: %s' % (topic_post, 'test publish'))
log.info('listen')
while True:
c.wait_msg() # 阻塞函数,监听消息
else:
log.info('Network connection failed! stagecode = {}, subcode = {}'.format(stagecode, subcode))
运行现象
SIM插入、4G网络正常,正常运行的现象。
log会显示正常连接、订阅topic 和 publish。最后在 publish 一条消息后,一直处于监听状态。
- 连接后能在管理页面看到设备在线。
- 测试下发控制消息
- 观察 V1.2 Demo板上的led - D6,收到下发的消息时的同时,LED亮灭状态也会发生变化。(led亮灭是脚本中自己加入的代码)
注意事项
- hmac 暂时有问题,需要等待固件更新,才可以让设备自行合成 passWord。
- 接入第三方MQTT服务器,需要自行计算
clientId、userName、passWord、brokerUrl
以及topic
,并替换。 umqtt
的异常提醒很坑,如设备账号填错,会报错bytes index out of range
。而非重新连接。- 代码中如有 while,会导致与上位机的通信阻塞。重新下载脚本时,需要断开连接、复位、再重新连接。(如带有while 的脚本文件名为 main.py,则会上电自运行,只能通过重刷固件解决)
以上是关于EC600 QuecPython接入第三方MQTT服务器, 以阿里云物联网平台为例,可替换为自己创建的MQTT服务器的主要内容,如果未能解决你的问题,请参考以下文章
EC600 QuecPython下载脚本代码到开发板设置开机自运行
移远EC600低功耗唤醒锁wakelock的使用,QuecPython功耗分析
移远EC600低功耗唤醒锁wakelock的使用,QuecPython功耗分析
安信可NB-IoT模组EC系列AT指令应用笔记②MQTT接入阿里云