为啥脚本没有从 mosquitto 接收数据?
Posted
技术标签:
【中文标题】为啥脚本没有从 mosquitto 接收数据?【英文标题】:Why the script doesn't receive data from mosquitto?为什么脚本没有从 mosquitto 接收数据? 【发布时间】:2021-12-04 13:38:57 【问题描述】:我有以下代码接收来自不同 MQTT 主题的数据。
#!/usr/bin/python3
import paho.mqtt.client as mqtt
import pymysql
import json
import sys
import time
#-### Constantes ####
PUERTO_MQTT = ...
USUARIO = ...
CLAVE = ...
#DB_LOCAL = ...
DB_LOCAL = ...
RETARDO_SUSCRIP = 30 #s
#-### Variables ####
idsNgsConectados = set()
clienteMQTT = mqtt.Client("receptor_datos_ngs")
db_local = None
t1 = 0
t2 = 0
#-### Funciones ####
def on_connect(clienteMQTT, userdata, flags, rc):
print("Conexion establecida con el broker MQTT correctamente")
def on_message(clienteMQTT, userdata, msg):
msg_str = msg.payload.decode('utf-8')
# procesamiento de los mensajes MQTT
posPrimeraBarra = msg.topic.find('/')
print("topic: ".format(msg.topic))
print("datos: ".format(msg_str))
if posPrimeraBarra != -1:
id_ngs = int(msg.topic[:posPrimeraBarra])
ref_medicion = msg.topic[posPrimeraBarra+1:]
variables = json.loads(msg_str)
if type(variables) is dict:
if id_ngs in idsNgsConectados:
try:
cursor = db_local.cursor()
cursor.execute("INSERT INTO mediciones (id_ngs, referencia) VALUES (, '')".format(id_ngs, ref_medicion))
db_local.commit()
cursor.execute("SELECT id FROM mediciones WHERE (id_ngs= AND referencia='') ORDER BY id DESC LIMIT 1".format(id_ngs, ref_medicion))
id_medicion = cursor.fetchone()[0]
for var in variables:
cursor.execute("INSERT INTO valores_mediciones (id_medicion, variable, valor) VALUES (,'',)".format(id_medicion, var, variables[var]))
db_local.commit()
cursor.close()
except Exception as e:
db_local.rollback()
print("Error 1: fallo el procesamiento de un mensaje MQTT: " + str(e), file=sys.stderr)
sys.exit(1)
else:
print("Advertencia: un ngs envio datos sin haberse presentado, por lo que se le indicara que salude primero", file=sys.stderr)
clienteMQTT.publish(str(id_ngs),"saludar",qos=1)
else:
print("Advertencia: un ngs envio datos con un formato incorrecto por lo que se le indicara que se reinicie", file=sys.stderr)
clienteMQTT.publish(str(id_ngs),"reiniciar",qos=1)
def sub_topics():
global clienteMQTT
cursor = db_local.cursor()
cursor.execute('SELECT id FROM ngs')
ids_ngs = cursor.fetchall()
cursor.close()
print("Topics suscriptos:")
for id_ngs in ids_ngs:
id_ngs = id_ngs[0]
idsNgsConectados.add(id_ngs)
topic = str(id_ngs) + '/#'
print("\t".format(topic))
clienteMQTT.subscribe(topic)
try:
db_local = pymysql.connect( unix_socket='/run/mysqld/mysqld.sock',
user=USUARIO,
password=CLAVE,
db=DB_LOCAL )
clienteMQTT.on_connect = on_connect
clienteMQTT.on_message = on_message
clienteMQTT.username_pw_set(username=USUARIO, password=CLAVE)
clienteMQTT.connect("localhost", PUERTO_MQTT)
clienteMQTT.loop_start()
# Bucle infinito
while True:
time.sleep(RETARDO_SUSCRIP)
# Suscripcion a los topics de todos los ngs
sub_topics()
except Exception as e:
db_local.close()
clienteMQTT.loop_stop()
clienteMQTT.disconnect()
print("Error 2: desconocido: " + str(e), file=sys.stderr)
sys.exit(2)
脚本动态订阅主题。如果我从 shell 运行脚本,它运行良好,但如果将它设置为使用 systemd 在启动时运行,它会失败。我已将单元文件设置为需要并在 mosquitto、mariadb、dhcpcd 和 wpa_supplicant 服务之后运行。我确定数据是从另一台设备发布的,因为我可以使用“mosquitto_sub”接收它。 可能是什么原因?
它在 Raspberry pi 零 w 中运行,Mosquitto 版本为 1.5.7。
【问题讨论】:
您是否有机会从环境变量中读取这些常量?还是它们是硬编码的?如果它们是环境变量,那么您必须确保这些变量是在 systemd 使用的环境中设置的。另外,如果您说脚本在手动运行时运行正常,那么我怀疑使用 systemd 运行时出现问题。您能否详细说明它是如何失败的? 这可能是您在某种虚拟环境中开发的一种情况,该环境的paho-mqtt
版本与在系统范围内安装并在使用systemd
运行时使用的版本不同
每 30 秒调用一次 sub_topics()
也不会做任何有用的事情,它应该只调用一次(可能在 on_connect()
回调中。
此代码旨在从 Mosquitto 接收与不同主题相关的数据,这些数据会随时间变化,这就是为什么我每 30 秒调用一次 sub_topics() 来订阅新主题并重新订阅旧主题的原因那些(我实现了重新订阅以查看是否可以解决问题),并将数据存储在mariadb
数据库中的特定表中。失败在于脚本在通过 systemd 执行时未存储任何数据,同时我可以看到 Mosquitto 使用 mosquitto_sub
作为客户端接收和传递数据。
大家好,感谢您的回答。我有好消息...我可以找到问题的原因!这不是因为mosquitto
或paho-mqtt
,而是因为pymysql
。代码中的失败是我没有在 sql 事务中使用方法commit
。我将在单独的答案中更详细地解释这一点。
【参考方案1】:
我可以找到问题的原因,不是因为mosquitto
或paho-mqtt
,而是因为pymysql
。代码中的失败是我没有在 sql 事务中使用方法commit()
。这导致代码无法获取数据库中另一个进程引入的新数据。这是因为 InnoDB 的默认隔离级别 REPEATABLE READ。您可以阅读更多关于 here 的信息。进行该 sql 事务的正确方法如下:
def sub_topics():
cursor = db_local.cursor()
cursor.execute('SELECT id FROM ngs')
ids_ngs = cursor.fetchall()
print("Topics suscriptos:")
for id_ngs in ids_ngs:
id_ngs = id_ngs[0]
idsNgsConectados.add(id_ngs)
topic = str(id_ngs) + '/#'
print("\t".format(topic))
clienteMQTT.subscribe(topic)
db_local.commit()
cursor.close()
此代码使用该数据来确定它必须订阅哪些主题,因此该代码没有订阅预期的主题,因此它没有从 mosquitto 接收相关数据。
pymysql
文档未涵盖该主题,因此我建议新手(我意识到我是 50% 新手 50% 专业 jaja)阅读有关 ACID 和您正在使用的 RDBMS 的实现细节。
【讨论】:
有趣的话题。很高兴你能解决它!以上是关于为啥脚本没有从 mosquitto 接收数据?的主要内容,如果未能解决你的问题,请参考以下文章
React Redux:为啥这个嵌套组件没有从 redux 状态接收道具?
Python 没有接收到来自 Arduino Mega 2560 的第一行串行数据,而是接收到所有后续数据,为啥会发生这种情况?