为啥脚本没有从 mosquitto 接收数据?

Posted

技术标签:

【中文标题】为啥脚本没有从 mosquitto 接收数据?【英文标题】:Why the script doesn't receive data from mosquitto?为什么脚本没有从 mosquitto 接收数据? 【发布时间】:2021-12-04 13:38:57 【问题描述】:

我有以下代码接收来自不同 MQTT 主题的数据。

#!/usr/bin/python3
import paho.mqtt.client as mqtt
import pymysql
import json
import sys
import time

#-### Constantes ####
PUERTO_MQTT = ...
USUARIO = ...
CLAVE = ...
#DB_LOCAL = ...
DB_LOCAL = ...
RETARDO_SUSCRIP = 30 #s

#-### Variables ####
idsNgsConectados = set()
clienteMQTT = mqtt.Client("receptor_datos_ngs")
db_local = None
t1 = 0
t2 = 0

#-### Funciones ####
def on_connect(clienteMQTT, userdata, flags, rc):
    print("Conexion establecida con el broker MQTT correctamente")

def on_message(clienteMQTT, userdata, msg):
    msg_str = msg.payload.decode('utf-8')

    # procesamiento de los mensajes MQTT
    posPrimeraBarra = msg.topic.find('/')

    print("topic: ".format(msg.topic))
    print("datos: ".format(msg_str))

    if posPrimeraBarra != -1:
        id_ngs = int(msg.topic[:posPrimeraBarra])
        ref_medicion = msg.topic[posPrimeraBarra+1:]
        variables = json.loads(msg_str)

        if type(variables) is dict:
            if id_ngs in idsNgsConectados:
                try:
                    cursor = db_local.cursor()
                    cursor.execute("INSERT INTO mediciones (id_ngs, referencia) VALUES (, '')".format(id_ngs, ref_medicion))
                    db_local.commit()
                    cursor.execute("SELECT id FROM mediciones WHERE (id_ngs= AND referencia='') ORDER BY id DESC LIMIT 1".format(id_ngs, ref_medicion))
                    id_medicion = cursor.fetchone()[0]
                    for var in variables:
                        cursor.execute("INSERT INTO valores_mediciones (id_medicion, variable, valor) VALUES (,'',)".format(id_medicion, var, variables[var]))
                    db_local.commit()
                    cursor.close()
                except Exception as e:
                    db_local.rollback()
                    print("Error 1: fallo el procesamiento de un mensaje MQTT: " + str(e), file=sys.stderr)
                    sys.exit(1)
            else:
                print("Advertencia: un ngs envio datos sin haberse presentado, por lo que se le indicara que salude primero", file=sys.stderr)
                clienteMQTT.publish(str(id_ngs),"saludar",qos=1)
        else:
            print("Advertencia: un ngs envio datos con un formato incorrecto por lo que se le indicara que se reinicie", file=sys.stderr)
            clienteMQTT.publish(str(id_ngs),"reiniciar",qos=1)

def sub_topics():
    global clienteMQTT
    cursor = db_local.cursor()
    cursor.execute('SELECT id FROM ngs')
    ids_ngs = cursor.fetchall()
    cursor.close()
    print("Topics suscriptos:")
    for id_ngs in ids_ngs:
        id_ngs = id_ngs[0]
        idsNgsConectados.add(id_ngs)
        topic = str(id_ngs) + '/#'
        print("\t".format(topic))
        clienteMQTT.subscribe(topic)

try:
    db_local = pymysql.connect( unix_socket='/run/mysqld/mysqld.sock',
                    user=USUARIO,
                    password=CLAVE,
                    db=DB_LOCAL )

    clienteMQTT.on_connect = on_connect
    clienteMQTT.on_message = on_message
    clienteMQTT.username_pw_set(username=USUARIO, password=CLAVE)

    clienteMQTT.connect("localhost", PUERTO_MQTT)
    clienteMQTT.loop_start()

    # Bucle infinito
    while True:
        time.sleep(RETARDO_SUSCRIP)
        # Suscripcion a los topics de todos los ngs
        sub_topics()
except Exception as e:
    db_local.close()
    clienteMQTT.loop_stop()
    clienteMQTT.disconnect()
    print("Error 2: desconocido: " + str(e), file=sys.stderr)
    sys.exit(2)

脚本动态订阅主题。如果我从 shell 运行脚本,它运行良好,但如果将它设置为使用 systemd 在启动时运行,它会失败。我已将单元文件设置为需要并在 mosquitto、mariadb、dhcpcd 和 wpa_supplicant 服务之后运行。我确定数据是从另一台设备发布的,因为我可以使用“mosquitto_sub”接收它。 可能是什么原因?

它在 Raspberry pi 零 w 中运行,Mosquitto 版本为 1.5.7。

【问题讨论】:

您是否有机会从环境变量中读取这些常量?还是它们是硬编码的?如果它们是环境变量,那么您必须确保这些变量是在 systemd 使用的环境中设置的。另外,如果您说脚本在手动运行时运行正常,那么我怀疑使用 systemd 运行时出现问题。您能否详细说明它是如何失败的? 这可能是您在某种虚拟环境中开发的一种情况,该环境的paho-mqtt 版本与在系统范围内安装并在使用systemd 运行时使用的版本不同 每 30 秒调用一次 sub_topics() 也不会做任何有用的事情,它应该只调用一次(可能在 on_connect() 回调中。 此代码旨在从 Mosquitto 接收与不同主题相关的数据,这些数据会随时间变化,这就是为什么我每 30 秒调用一次 sub_topics() 来订阅新主题并重新订阅旧主题的原因那些(我实现了重新订阅以查看是否可以解决问题),并将数据存储在mariadb 数据库中的特定表中。失败在于脚本在通过 systemd 执行时未存储任何数据,同时我可以看到 Mosquitto 使用 mosquitto_sub 作为客户端接收和传递数据。 大家好,感谢您的回答。我有好消息...我可以找到问题的原因!这不是因为mosquittopaho-mqtt,而是因为pymysql。代码中的失败是我没有在 sql 事务中使用方法commit。我将在单独的答案中更详细地解释这一点。 【参考方案1】:

我可以找到问题的原因,不是因为mosquittopaho-mqtt,而是因为pymysql。代码中的失败是我没有在 sql 事务中使用方法commit()。这导致代码无法获取数据库中另一个进程引入的新数据。这是因为 InnoDB 的默认隔离级别 REPEATABLE READ。您可以阅读更多关于 here 的信息。进行该 sql 事务的正确方法如下:

def sub_topics():
    cursor = db_local.cursor()
    cursor.execute('SELECT id FROM ngs')
    ids_ngs = cursor.fetchall()
    print("Topics suscriptos:")
    for id_ngs in ids_ngs:
        id_ngs = id_ngs[0]
        idsNgsConectados.add(id_ngs)
        topic = str(id_ngs) + '/#'
        print("\t".format(topic))
        clienteMQTT.subscribe(topic)
    db_local.commit()
    cursor.close()

此代码使用该数据来确定它必须订阅哪些主题,因此该代码没有订阅预期的主题,因此它没有从 mosquitto 接收相关数据。

pymysql 文档未涵盖该主题,因此我建议新手(我意识到我是 50% 新手 50% 专业 jaja)阅读有关 ACID 和您正在使用的 RDBMS 的实现细节。

【讨论】:

有趣的话题。很高兴你能解决它!

以上是关于为啥脚本没有从 mosquitto 接收数据?的主要内容,如果未能解决你的问题,请参考以下文章

MQTTX 接收不到订阅数据的排查

React Redux:为啥这个嵌套组件没有从 redux 状态接收道具?

Python 没有接收到来自 Arduino Mega 2560 的第一行串行数据,而是接收到所有后续数据,为啥会发生这种情况?

当我们从 UDP 服务器接收数据包时,为啥我们必须在单独的线程中接收它们?

为啥我从对等端接收的数据与预期输出不匹配?

为啥数据被推入通道但从未从接收器 goroutine 中读取?