《flask》flask+mqtt联动快速上手

Posted 帅气的黑桃J

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《flask》flask+mqtt联动快速上手相关的知识,希望对你有一定的参考价值。

简介

本文旨在介绍如何快速上手联动flask + mqtt,本文将会给出一个简单的demo,用于演示在如何通过访问flask接口来触发mqtt,并在flask运行的基础的上对mqtt进行订阅。

快速上手

因为有项目需求,所以需要flask + mqtt进行联动,因为需要一直开启监听,所以需要一直挂在一个线程上,一开始想到用多线程做,或者说用异步协程来做,后来发现有一个关于flask的mqtt扩展库,因此为了快速上手,直接用该库进行开发,可以节省很多精力。

link: https://github.com/stlehmann/Flask-MQTT

首先,我们需要安装flask-mqtt库。

pip install flask-mqtt

下面放上一个简单的示例代码,里面包括了MQTT的订阅、发布,flask的访问。

"""  
A small Test application to show how to use Flask-MQTT.  
"""  
  
import eventlet  
from flask import Flask, render_template  
from flask_mqtt import Mqtt  
  
  
eventlet.monkey_patch()  
  
app = Flask(__name__)  
app.config['SECRET'] = 'my secret key'  
app.config['TEMPLATES_AUTO_RELOAD'] = True  
app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'  
app.config['MQTT_BROKER_PORT'] = 1883  
app.config['MQTT_USERNAME'] = ''  
app.config['MQTT_PASSWORD'] = ''  
app.config['MQTT_KEEPALIVE'] = 5  
app.config['MQTT_TLS_ENABLED'] = False  
app.config['MQTT_CLEAN_SESSION'] = True  
  
  
mqtt = Mqtt(app)  
  
  
@app.route('/')  
def index():  
    return "hello mqtt_flask"  
  
@app.route('/hello')  
def hello():  
    mqtt.publish('hello', 'hello, this is flask')  
    print("[mqtt] publish successfully")  
    return "publish successfully"  
  
@mqtt.on_connect()  
def handle_connect(client, userdata, flags, rc):  
    mqtt.subscribe('hello')  
    print("[mqtt] has listen topic hello")  
  
  
@mqtt.on_message()  
def handle_mqtt_message(client, userdata, message):  
    data = dict(  
        topic=message.topic,  
        payload=message.payload.decode()  
    )  
    print(data)  
  
if __name__ == '__main__':  
    # important: Do not use reloader because this will create two Flask instances.  
    # Flask-MQTT only supports running with one instance    # socketio.run(app, host='0.0.0.0', port=5000, use_reloader=False, debug=False)    app.run()

订阅测试

这里用了MQTTX工具来进行测试,现在我已经连接到了一个MQTT服务器上,我将用MQTTX向该服务器发送一个topic为hello,payload为"msg": "Hello, This is mqtt server"的消息。

运行程序后,MQTTX发送消息,结果如下,可以看到,接收正常。

发布测试

现在我们访问http://localhost:5000/hello,flask服务器将会向我的MQTT服务器发送一个topic为hello,payload为"hello, this is flask"的消息,我们用MQTTX来监听。

从下图中可以看到,MQTTX成功订阅接收到了该消息,至此,关于FLASK+MQTT的必要流程已经可以走通了。

进一步测试

最初,笔者以为启动mqtt_client.loop_start()函数之后会线程堵塞,但是在使用 Paho MQTT Python 库时,调用 client.loop_start() 方法会在后台启动一个新线程,用于处理 MQTT 客户端的事件循环。这意味着您的代码不会堵塞,并且可以在 MQTT 客户端的事件循环运行时继续执行。

例如,您可以在调用 client.loop_start() 后使用以下代码继续执行其他操作:

import paho.mqtt.client as mqtt

# The callback for when a message is received from the server
def on_message(client, userdata, message):
    print(f"Received message 'message.payload.decode()' on topic 'message.topic'")

# Create an MQTT client
client = mqtt.Client()

# Set the on_message callback
client.on_message = on_message

# Connect to the MQTT server
client.connect("mqtt.example.com", 1883, 60)

# Subscribe to a topic
client.subscribe("my/topic")

# Start the MQTT client loop in the background
client.loop_start()

# Do other work here
print("Doing other work")

# Wait for a while
time.sleep(10)

# Stop the MQTT client loop
client.loop_stop()

在上面的代码中,我们在调用 client.loop_start() 后立即执行了其他代码,然后等待了 10 秒钟。在这期间,MQTT 客户端的事件循环仍在后台运行,并等待接收消息。

基于此,那么flask和mqtt联动其实不用上述的扩展库也可以,笔者在paho官方的demo上小改一下,最后结果如下所示:

"""
访问 localhost:5000/hello时,会用mqtt客户端发布主题消息
"""
from flask import Flask, jsonify  
import paho.mqtt.client as mqtt  
import sys  
  
  
app = Flask(__name__)  
print('[app] start work', file=sys.stdout)  
  
  
def connect_mqtt() -> mqtt:  
    def on_connect(client, userdata, flags, rc):  
        if rc == 0:  
            print("Connected to MQTT Broker!", file=sys.stdout)  
        else:  
            print("Failed to connect, return code %d\\n", rc, file=sys.stdout)  
  
    client = mqtt.Client()  
    client.username_pw_set("", "")  
    client.on_connect = on_connect  
    client.connect("broker.emqx.io", 1883, 60)  
    return client  
  
  
def subscribe(client: mqtt, topic):  
    def on_message(client, userdata, msg):  
        print(f"[mqtt] Received `msg.payload.decode()` from `msg.topic` topic", file=sys.stdout)  
  
    client.on_message = on_message  
    client.subscribe(topic)  
    print("[mqtt] subscribe topic")  
  
  
client = connect_mqtt()  
subscribe(client, "hello")  
client.loop_start()
  
@app.route("/hello")  
def alarm():  
    client.publish("hello", "important msg")  
    print("[mqtt] send msg successfully", file=sys.stdout)  
    return "Mqtt message published"  
  
  
if __name__ == '__main__':  
    app.run()

该代码经本人实际测试,也是可以正常食用的。需要说明的是,如果你只是需要发布而不需要监听,那么client.loop_start()不是必要的。

以上是关于《flask》flask+mqtt联动快速上手的主要内容,如果未能解决你的问题,请参考以下文章

flask 快速上手 预备知识

Flak快速上手

Flask

Flask快速入门

Flask快速入门

Flask-RESTful 快速构建TODO应用