Python:MQTT代理消息批量插入mysql数据库

Posted

技术标签:

【中文标题】Python:MQTT代理消息批量插入mysql数据库【英文标题】:Python: MQTT broker messages bulk insert into mysql database 【发布时间】:2018-05-06 04:55:00 【问题描述】:

我已经使用 paho mqtt 客户端订阅了多个主题。在收到来自代理的消息后,我想将消息存储到 mysql 数据库中。我想在插入 DB 之前完全收集消息。我已经设置了阈值,比如 1000 条消息。仅当达到阈值时,才必须将消息一次全部插入数据库。我在 cursor.execute() 之后检查 row_count。但它显示计数为 1。因此没有发生批量插入。这是我的示例代码 sn-p

//main.py

#mysql database class
db = MySQLDBClass()

#mqtt client class where subscription,connection to broker,some callbacks   
mqttclient = MyMQTTClient() 
mqttclient.on_message = db.onMessage
mqttclient.loop_forever()

//MySQLDBClass.py

 def __init__(self):

        self.insertcounter = 0
        self.insertStatement = ''
        self.bulkpayload = ''
        self.maxInsert = 1000

    def onMessage(self, client, userdata, msg):

        if  msg.topic.startswith("topic1/"):
            self.bulkpayload += "(" + msg.payload.decode("utf-8") + "," + datetime + "),"
        elif msg.topic.startswith("topic2/"):
            self.insertStatement += "INSERT INTO mydatabase.table1 VALUES (" + msg.payload.decode("utf-8") + "," + datetime + ");"
        elif msg.topic.startswith("topic3/")   
            self.insertStatement += "INSERT INTO mydatabase.table2 VALUES (" +msg.payload.decode("utf-8") + "," + datetime + ");"
        elif msg.topic.startswith("messages"):
            self.insertStatement += "INSERT INTO mydatabase.table3 VALUES ('" + msg.topic + "',"  + msg.payload.decode("utf-8") + "," + datetime + ");"
        else:
            return  # do not store in DB

        self.insertcounter += 1 

        if ( self.insertcounter > self.maxInsert ): 
            if ( self.bulkpayload != '' ):
                self.insertStatement += "INSERT INTO mydatabase.table4 VALUES" + self.bulkpayload + ";"    
                self.bulkpayload = ''

            cursor.execute(self.insertStatement)
            cursor.commit()
            print (cursor.rowcount) #prints always count as one , expecting bulk count 
            self.insertcounter  = 0
            self.insertStatement = ''

【问题讨论】:

【参考方案1】:

使用 pymysql 模块,execute 一次只能执行一个查询,而通过使用 mysql-connector-python,我们可以在 execute(mutli=true) 中设置 multi=True 来执行多条语句。 https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlcursor-execute.html

【讨论】:

以上是关于Python:MQTT代理消息批量插入mysql数据库的主要内容,如果未能解决你的问题,请参考以下文章

一个 MQTT 代理可以处理多少个客户端?

压力测试 mqtt 代理(HiveMQ CE 代理)

VernelMQ —— Erlang 分布式 MQTT 消息代理

如何从网站向不支持 Websockets 的 MQTT 代理发送消息?

mqtt:避免缓冲消息

通过 mosquitto 了解 MQTT协议