python实现 MQTT订阅接收以及MySQL数据库存储

Posted wlwwwhl

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python实现 MQTT订阅接收以及MySQL数据库存储相关的知识,希望对你有一定的参考价值。

python实现 MQTT订阅、接收以及mysql数据库存储

简单物联网应用——基于老人居家声音监测系统

(1)数据接收程序使用 python 编写, 首先连接 MQTT 服务器, 订阅硬件数据发送的主题
“esp/test” , 将数据转换成 json 格式, 调用数据库存储函数 sqlsave(msgjson), MQTT 数据
接收程序如下:

# 连接 MQTT 服务器

def on_mqtt_connect():
mqttClient=mqtt.Client("pythontest")
mqttClient.connect(MQTTHOST, MQTTPORT,6)
mqttClient.loop_start()
def on_message_come(lient, userdata, msg):
get_data=msg.payload #bytes b'[s]
string=get_data.decode() #string
msgjson=json.loads(string)
print(msgjson)
sqlsave(msgjson)
#subscribe 消息

def on_subscribe():
mqttClient.subscribe(subscribe, qos=0)
mqttClient.on_message = on_message_come # 消息到来处理函数

(2)编写数据库保存函数 sqlsave(jsonData), 连接数据库, 并向 data_voice_sensor 表中插
入 MQTT 发送的数据:

def sqlsave(jsonData):
# 打开数据库连接
db = pymysql.connect(host="192.168.174.128",
user="root",password="password",database="test",charset='utf8')
cursor = db.cursor() # 使用 cursor()方法获取操作游标
# SQL 插入语句
sql = "INSERT INTO data_voice_sensor (get_time,sensorType,device,get_data,get_value)
VALUES ('%s','%s','%s','%s','%s');"\\
%(jsonData['get_time'],jsonData['sensorType'],
jsonData['get_data'],jsonData['device'],jsonData['get_value'],)
cursor.execute(sql)
db.commit()
db.close()

上述代码在我的电脑里面无缘无故不能订阅MQTT主题了,目前没找到原因,但是换一台电脑是可以的
下面这个完整的有两个,第二个是目前在我电脑上面可以跑起来的,请高手指教!

一、

import paho.mqtt.client as mqtt
import json
import pymysql

MQTTHOST = "192.168.43.188"
MQTTPORT = 1883
mqttClient = mqtt.Client()
subscribe ="esp/test"

#MySQL保存
def sqlsave(jsonData):
  # 打开数据库连接
  db = pymysql.connect(host="192.168.174.128",user="root",password="password",database="test",charset='utf8')
  # 使用cursor()方法获取操作游标 
  cursor = db.cursor()
  # SQL 插入语句
  sql = "INSERT INTO data_voice_sensor (get_time,sensorType,device,get_data,get_value) \\
        VALUES ('%s','%s','%s','%s','%s');"\\
            %(jsonData['get_time'],jsonData['sensorType'],jsonData['get_data'],jsonData['device'],jsonData['get_value'],)
  cursor.execute(sql)
  db.commit()
  print("数据库保存成功!")
  # 关闭数据库连接
  db.close()

# 连接MQTT服务器
def on_mqtt_connect():
  mqttClient=mqtt.Client("pythontest")
  mqttClient.connect(MQTTHOST, MQTTPORT,60)
  mqttClient.loop_start()

def on_message_come(lient, userdata, msg):
    get_data=msg.payload #bytes  b'[s]
    print(get_data)


    string=get_data.decode()  #string
    print(string)
    msgjson=json.loads(string)
    print(msgjson)
    sqlsave(msgjson)


# subscribe 消息
def on_subscribe():
  mqttClient.subscribe(subscribe, qos=0)
  mqttClient.on_message = on_message_come # 消息到来处理函数
  

def main():
  on_mqtt_connect()
  while True:
    on_subscribe()
if __name__ == '__main__':
  main()

二、

#!/usr/bin/python
# -*- coding: utf-8 -*

import paho.mqtt.client as mqtt
import json
import pymysql
import time
def gettime():
    time1=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
    return time1

# 服务器地址
host = '192.168.43.188'
# 通信端口 默认端口1883
port = 1883


username = ''
password = ''

# 订阅主题名
topic = 'esp/test'


# 连接后事件
def on_connect(client, userdata, flags, respons_code):
    if respons_code == 0:
        # 连接成功
        print('Connection Succeed!')
    else:
        # 连接失败并显示错误代码
        print('Connect Error status {0}'.format(respons_code))
    # 订阅信息
    client.subscribe(topic)


# 接收到数据后事件
def on_message(client, userdata, msg):
    # 打印订阅消息主题
    # print("topic", msg.topic)
    # 打印消息数据
    jsondata=json.loads(msg.payload)
    print("msg payload", jsondata)
    sqlsave(jsondata)

def main_demo():
    client = mqtt.Client()
    # 注册事件
    client.on_connect = on_connect
    client.on_message = on_message
    # 设置账号密码(如果需要的话)
    client.username_pw_set(username, password=password)
    # 连接到服务器
    client.connect(host, port=port, keepalive=60)
    # 守护连接状态
    client.loop_forever()

#MySQL保存
def sqlsave(jsonData):
      # 打开数据库连接
    db = pymysql.connect(host="192.168.174.128",user="root",password="password",database="test",charset='utf8')
    # 使用cursor()方法获取操作游标 
    cursor = db.cursor()
    # SQL 插入语句
    sql = "INSERT INTO data_voice (get_time,sensorType,device,get_data,get_value) \\
            VALUES ('%s','%s','%s','%s','%s');"\\
                %(gettime(),jsonData['sensorType'],jsonData['get_data'],jsonData['device'],jsonData['get_value'],)
    cursor.execute(sql)
    db.commit()
    print("数据库保存成功!")
    # 关闭数据库连接
    db.close()


if __name__ == '__main__':
    main_demo()



以上是关于python实现 MQTT订阅接收以及MySQL数据库存储的主要内容,如果未能解决你的问题,请参考以下文章

Android使用MQTT订阅及发布消息(初步了解Mqtt以及实现Android操作mqtt服务)

使用python实现MQTT发布订阅

paho-mqtt实现多客户端订阅一个主题,并保证消息只被接收一次

python使用MQTT协议发送订阅消息

SpringBoot2.x集成MQTT实现消息订阅(附源码)

PAHO MQTT Python 客户端 - 缺少确认,保证为订阅者交付