Python对阿里云物联网MQTT设备接入端开发
Posted 以梦为马&不负韶华
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python对阿里云物联网MQTT设备接入端开发相关的知识,希望对你有一定的参考价值。
前言
MQTT是用于物联网(IoT)的OASIS标准消息传递协议。本文主要记录使用阿里云物联网平台中,网关设备接入提示:需要理解Python paho-mqtt 模块,本文使用aliyun-iot-linkkit实现,适用于Django环境下
建议先看完我的另一篇文章阿里云物联网平台使用,在进行使用
一、快速搭建Python对阿里云物联网MQTT设备接入端代码实现
先上代码
1. mqtt子应用下view.py 主文件
import json
import logging
import sys
import threading
import time
from linkkit import linkkit
logger = logging.getLogger('django')
# 来自一机一密的设备
options = "ProductKey": "xxxxxxxxxxx",
"DeviceName": "device-name",
"DeviceSecret": "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"
# 示例代码配置设备的设备证书以及连接的公共示例的RegionID
lk = linkkit.LinkKit(
host_name="cn-shanghai", # 华东2(上海),根据自己的RegionID
product_key=options["ProductKey"],
device_name=options["DeviceName"],
device_secret=options["DeviceSecret"])
def on_connect(session_flag, rc, userdata):
"""
callback after connect_async
:param session_flag: type:int description:is previous connect session,0 new session; 1 previous session
:param rc: type:int rc的值决定了连接成功或者不成功:
:param userdata: type: description:same as LinkKit input parameter user_data
"""
print("on_connect:%d,rc:%d,userdata:" % (session_flag, rc))
if rc == 0:
# 连接成功
print("Connection successful")
elif rc == 1:
# 协议版本错误
print("Protocol version error")
elif rc == 2:
# 无效的客户端标识
print("Invalid client identity")
elif rc == 3:
# 服务器无法使用
print("server unavailable")
elif rc == 4:
# 错误的用户名或密码
print("Wrong user name or password")
elif rc == 5:
# 未经授权
print("unaccredited")
print("Connect with the result code " + str(rc))
def on_disconnect(rc, userdata):
"""
callback after connection disconnect
:param rc: type:int description:0 success call for disconnect,1 network error
:param userdata: type: description:same as LinkKit input parameter user_data
"""
print("on_disconnect:rc:%d,userdata:" % rc)
if rc != 0:
print("Unexpected disconnection %s" % rc)
def on_topic_message(topic, payload, qos, userdata):
"""
callback after subscribe_topic call
:param topic: 订阅的主题
:param payload: 内容
:param qos: 质量服务等级
:param userdata:
"""
print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos))
json_msg = json.loads(payload.decode('utf-8')) # 一般为json数据
try:
# 接收到消息后的业务逻辑,同时处理多任务亦可以采用异步、线程池等
pass
except Exception as e:
print('No this moType', e)
def on_subscribe_topic(mid, granted_qos, userdata):
"""
callback after subscribe_topic call
:param mid: type: int description:publish message id
:param granted_qos: type:list(int) description: corresponding to subscribe_topic parameter topic,0 represent qos=0,1 represent qos=1,128 represent subscribe error
:param userdata: type: description:same as LinkKit input parameter user_data
"""
print("on_subscribe_topic mid:%d, granted_qos:%s" %
(mid, str(','.join('%s' % it for it in granted_qos))))
print(granted_qos)
if granted_qos == 128:
print("订阅失败")
def on_unsubscribe_topic(mid, userdata):
"""
callback after unsubscribe topic
:param mid: type: int description:publish message id
:param userdata: type: description:same as LinkKit input parameter user_data
"""
print("on_unsubscribe_topic mid:%d" % mid)
pass
def on_publish_topic(mid, userdata):
"""
callback after publish_topic call
:param mid: type: int description:publish message id
:param userdata: type: description:same as LinkKit input parameter user_data
"""
print("on_publish_topic mid:%d" % mid)
# mqtt发布启动函数
def mqtt_publish(sensor_data, topic='defult', qos=0):
try:
rc, mid = lk.publish_topic(lk.to_full_topic("user/update"), sensor_data)
print("mqtt_publish:已启动...", "user/update", sensor_data)
return
except KeyboardInterrupt:
print("EXIT")
# 这是网络循环的阻塞形式,直到客户端调用disconnect()时才会返回。它会自动处理重新连接。
lk.on_disconnect()
sys.exit(0)
# 启动函数
def mqtt_run():
# 账号密码验证放到最前面
# client.username_pw_set('user', 'user')
# client = mqtt.Client()
# 建立mqtt连接
# 注册接收到云端数据的方法
lk.on_connect = on_connect
# 注册取消接收到云端数据的方法
lk.on_disconnect = on_disconnect
# 如果产品生产时错误地将一个三元组烧写到了多个设备,多个设备将会被物联网平台认为是同一个设备,
# 从而出现一个设备上线将另外一个设备的连接断开的情况。用户可以将自己的接口信息上传到云端,那么云端可以通过接口的信息来进行问题定位。
lk.config_device_info("Eth|03ACDEFF0032|Eth|03ACDEFF0031")
# 企业实例域名配置的更改
lk.config_mqtt(secure="", endpoint="iot-060a085o.mqtt.iothub.aliyuncs.com")
# 注册云端订阅的方法
lk.on_subscribe_topic = on_subscribe_topic
# 注册当接受到云端发送的数据的时候的方法
lk.on_topic_message = on_topic_message
# 注册向云端发布数据的时候顺便所调用的方法
lk.on_publish_topic = on_publish_topic
# 注册取消云端订阅的方法
lk.on_unsubscribe_topic = on_unsubscribe_topic
# 连接阿里云的函数(异步调用)
lk.connect_async()
# 因为他是他是异步调用需要时间所以如果没有这个延时函数的话,他就会出现not in connected state的错误
time.sleep(2)
# 订阅这个topic,不需要写prodect_key和device_name
rc, mid = lk.subscribe_topic(lk.to_full_topic("user/get"))
2. wsgi.py
为mqtt服务创建一个线程
import threading
from apps.mqtt import views
# 启用多线程 运行脱机主控模块
thread_mqtt_run = threading.Thread(target=views.mqtt_run)
thread_mqtt_run.start()
二、具体使用介绍
1. 安装包
安装Python对接mqtt协议库,paho-mqtt
pip install paho-mqtt
安装阿里云物联网封装paho-mqtt后的库,aliyun-iot-linkkit
pip install aliyun-iot-linkkit
2. 设备认证,一机一密型接入(11-22行)
另一篇文章阿里云物联网平台使用有介绍。
从创建好的设备中,找一个设备证书,一键复制传入
注意:host_name是阿里云上你买的服务地址,“地域和可用区中查看对应的RegionID,公共实例和企业实例还有区别。
如果需要改变MQTT连接的一些默认参数,可以通过config_mqtt 指定端口等连接参数,如下所示:
config_mqtt(self, port=1883, protocol="MQTTv311", transport="TCP",
secure="TLS", keep_alive=60, clean_session=True,
max_inflight_message=20, max_queued_message=0,
auto_reconnect_min_sec=1,
auto_reconnect_max_sec=60,
cadata=None):
3. 回调函数
用户可以在回调中加入自己的业务处理逻辑。
如果不够清晰,可自行查看Python Link SDK中API列表信息。
3.1 on_connect,设备连接云端成功后会通过on_connect回调函数通知用户(25-51行)
135行,为注册该回调函数
3.2 on_disconnect ,连接成功以后如果连接断开会通过on_disconnect 回调通知用户(54-62行)
137行,为注册该回调函数
4. 订阅云端消息(82-93行,155-156行)
订阅结果通过on_subscribe_topic通知用户:
5. 接收与处理来自云端的消息(65-79行,145-146行)
通过on_topic_message()回调告知用户
6. 发送消息到云端(106-112行,115-125行,147-148行)
发布消息结果通知
消息发送后,云端是否成功接收通过on_publish_topic回调通知用户:
发送消息
通过调用publish_topic()实现将消息发送到云端:
核心为:rc, mid = lk.publish_topic(lk.to_full_topic(“user/pub”), “123”)
此处为个人封装,上送时随时调取函数
7. 取消消息订阅(95-103行,149-150行)
通过调用unsubscribe_topic()取消对指定topic消息的订阅:
本文暂时没用用到,如有需要,按照如下写即可
rc, mid = lk.unsubscribe_topic(lk.to_full_topic(“user/test”))
取消订阅的结果通过on_unsubscribe_topic回调通知用户:
总结
希望大家多多交流,一起进步。
您的点赞是我坚持的动力,
以上是关于Python对阿里云物联网MQTT设备接入端开发的主要内容,如果未能解决你的问题,请参考以下文章