Python对阿里云物联网MQTT设备接入端开发

Posted 以梦为马&不负韶华

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python对阿里云物联网MQTT设备接入端开发相关的知识,希望对你有一定的参考价值。

文章目录


前言

MQTT是用于物联网(IoT)的OASIS标准消息传递协议。本文主要记录使用阿里云物联网平台中,网关设备接入

提示:需要理解Python paho-mqtt 模块,本文使用aliyun-iot-linkkit实现,适用于Django环境下
建议先看完我的另一篇文章阿里云物联网平台使用,在进行使用

一、快速搭建Python对阿里云物联网MQTT设备接入端代码实现

先上代码

1. mqtt子应用下view.py 主文件

import json
import logging
import sys
import threading
import time

from linkkit import linkkit

logger = logging.getLogger('django')

# 来自一机一密的设备
options = "ProductKey": "xxxxxxxxxxx",
           "DeviceName": "device-name",
           "DeviceSecret": "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"
           

# 示例代码配置设备的设备证书以及连接的公共示例的RegionID
lk = linkkit.LinkKit(
    host_name="cn-shanghai",  # 华东2(上海),根据自己的RegionID
    product_key=options["ProductKey"],
    device_name=options["DeviceName"],
    device_secret=options["DeviceSecret"])


def on_connect(session_flag, rc, userdata):
    """
    callback after connect_async
    :param session_flag: type:int description:is previous connect session,0 new session; 1 previous session
    :param rc: type:int rc的值决定了连接成功或者不成功:
    :param userdata: type:  description:same as LinkKit input parameter user_data
    """
    print("on_connect:%d,rc:%d,userdata:" % (session_flag, rc))
    if rc == 0:
        # 连接成功
        print("Connection successful")
    elif rc == 1:
        # 协议版本错误
        print("Protocol version error")
    elif rc == 2:
        # 无效的客户端标识
        print("Invalid client identity")
    elif rc == 3:
        # 服务器无法使用
        print("server unavailable")
    elif rc == 4:
        # 错误的用户名或密码
        print("Wrong user name or password")
    elif rc == 5:
        # 未经授权
        print("unaccredited")
    print("Connect with the result code " + str(rc))


def on_disconnect(rc, userdata):
    """
    callback after connection disconnect
    :param rc: type:int description:0 success call for disconnect,1 network error
    :param userdata: type: description:same as LinkKit input parameter user_data
    """
    print("on_disconnect:rc:%d,userdata:" % rc)
    if rc != 0:
        print("Unexpected disconnection %s" % rc)


def on_topic_message(topic, payload, qos, userdata):
    """
    callback after subscribe_topic call
    :param topic: 订阅的主题
    :param payload: 内容
    :param qos: 质量服务等级
    :param userdata:
    """
    print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos))
    json_msg = json.loads(payload.decode('utf-8'))  # 一般为json数据
    try:
        # 接收到消息后的业务逻辑,同时处理多任务亦可以采用异步、线程池等
        pass
    except Exception as e:
        print('No this moType', e)


def on_subscribe_topic(mid, granted_qos, userdata):
    """
    callback after subscribe_topic call
    :param mid: type: int description:publish message id
    :param granted_qos: type:list(int) description: corresponding to subscribe_topic parameter topic,0 represent qos=0,1 represent qos=1,128 represent subscribe error
    :param userdata: type: description:same as LinkKit input parameter user_data
    """
    print("on_subscribe_topic mid:%d, granted_qos:%s" %
          (mid, str(','.join('%s' % it for it in granted_qos))))
    print(granted_qos)
    if granted_qos == 128:
        print("订阅失败")


def on_unsubscribe_topic(mid, userdata):
    """
    callback after unsubscribe topic
    :param mid: type: int description:publish message id
    :param userdata: type: description:same as LinkKit input parameter user_data
    """
    print("on_unsubscribe_topic mid:%d" % mid)
    pass


def on_publish_topic(mid, userdata):
    """
    callback after publish_topic call
    :param mid: type: int description:publish message id
    :param userdata: type: description:same as LinkKit input parameter user_data
    """
    print("on_publish_topic mid:%d" % mid)


# mqtt发布启动函数
def mqtt_publish(sensor_data, topic='defult', qos=0):
    try:
        rc, mid = lk.publish_topic(lk.to_full_topic("user/update"), sensor_data)
        print("mqtt_publish:已启动...", "user/update", sensor_data)
        return
    except KeyboardInterrupt:
        print("EXIT")
        # 这是网络循环的阻塞形式,直到客户端调用disconnect()时才会返回。它会自动处理重新连接。
        lk.on_disconnect()
        sys.exit(0)


# 启动函数
def mqtt_run():
    # 账号密码验证放到最前面
    # client.username_pw_set('user', 'user')
    # client = mqtt.Client()
    # 建立mqtt连接
    # 注册接收到云端数据的方法
    lk.on_connect = on_connect
    # 注册取消接收到云端数据的方法
    lk.on_disconnect = on_disconnect
    # 如果产品生产时错误地将一个三元组烧写到了多个设备,多个设备将会被物联网平台认为是同一个设备,
    # 从而出现一个设备上线将另外一个设备的连接断开的情况。用户可以将自己的接口信息上传到云端,那么云端可以通过接口的信息来进行问题定位。
    lk.config_device_info("Eth|03ACDEFF0032|Eth|03ACDEFF0031")
    # 企业实例域名配置的更改
    lk.config_mqtt(secure="", endpoint="iot-060a085o.mqtt.iothub.aliyuncs.com")
    # 注册云端订阅的方法
    lk.on_subscribe_topic = on_subscribe_topic
    # 注册当接受到云端发送的数据的时候的方法
    lk.on_topic_message = on_topic_message
    # 注册向云端发布数据的时候顺便所调用的方法
    lk.on_publish_topic = on_publish_topic
    # 注册取消云端订阅的方法
    lk.on_unsubscribe_topic = on_unsubscribe_topic
    # 连接阿里云的函数(异步调用)
    lk.connect_async()
    # 因为他是他是异步调用需要时间所以如果没有这个延时函数的话,他就会出现not in connected state的错误
    time.sleep(2)
    # 订阅这个topic,不需要写prodect_key和device_name
    rc, mid = lk.subscribe_topic(lk.to_full_topic("user/get"))


2. wsgi.py

为mqtt服务创建一个线程

import threading
from apps.mqtt import views

# 启用多线程 运行脱机主控模块
thread_mqtt_run = threading.Thread(target=views.mqtt_run)
thread_mqtt_run.start()

二、具体使用介绍

1. 安装包

安装Python对接mqtt协议库,paho-mqtt

pip install paho-mqtt 

安装阿里云物联网封装paho-mqtt后的库,aliyun-iot-linkkit

pip install aliyun-iot-linkkit 

2. 设备认证,一机一密型接入(11-22行)

另一篇文章阿里云物联网平台使用有介绍。
从创建好的设备中,找一个设备证书,一键复制传入
注意:host_name是阿里云上你买的服务地址,“地域和可用区中查看对应的RegionID,公共实例和企业实例还有区别。

如果需要改变MQTT连接的一些默认参数,可以通过config_mqtt 指定端口等连接参数,如下所示:

config_mqtt(self, port=1883, protocol="MQTTv311", transport="TCP",
            secure="TLS", keep_alive=60, clean_session=True,
            max_inflight_message=20, max_queued_message=0,
            auto_reconnect_min_sec=1,
            auto_reconnect_max_sec=60,
            cadata=None):

3. 回调函数

用户可以在回调中加入自己的业务处理逻辑。
如果不够清晰,可自行查看Python Link SDK中API列表信息。

3.1 on_connect,设备连接云端成功后会通过on_connect回调函数通知用户(25-51行)


135行,为注册该回调函数

3.2 on_disconnect ,连接成功以后如果连接断开会通过on_disconnect 回调通知用户(54-62行)


137行,为注册该回调函数

4. 订阅云端消息(82-93行,155-156行)

订阅结果通过on_subscribe_topic通知用户:

5. 接收与处理来自云端的消息(65-79行,145-146行)

通过on_topic_message()回调告知用户

6. 发送消息到云端(106-112行,115-125行,147-148行)

发布消息结果通知
消息发送后,云端是否成功接收通过on_publish_topic回调通知用户:


发送消息
通过调用publish_topic()实现将消息发送到云端:
核心为:rc, mid = lk.publish_topic(lk.to_full_topic(“user/pub”), “123”)
此处为个人封装,上送时随时调取函数

7. 取消消息订阅(95-103行,149-150行)

通过调用unsubscribe_topic()取消对指定topic消息的订阅:
本文暂时没用用到,如有需要,按照如下写即可
rc, mid = lk.unsubscribe_topic(lk.to_full_topic(“user/test”))

取消订阅的结果通过on_unsubscribe_topic回调通知用户:

总结

希望大家多多交流,一起进步。
您的点赞是我坚持的动力,

以上是关于Python对阿里云物联网MQTT设备接入端开发的主要内容,如果未能解决你的问题,请参考以下文章

Python对阿里云物联网MQTT设备接入端开发

Python对阿里云物联网MQTT设备接入端开发

Python模拟智能开关设备接入阿里云物联网平台

IoT设备接入:阿里云物联网平台体验

MQTT——使用MQTT.fx工具接入阿里云物联网平台,实现基本通信调测

MQTT——使用MQTT.fx工具接入阿里云物联网平台,实现基本通信调测