PAHO MQTT Python 客户端 - 缺少确认,保证为订阅者交付
Posted
技术标签:
【中文标题】PAHO MQTT Python 客户端 - 缺少确认,保证为订阅者交付【英文标题】:PAHO MQTT Python client - acknowledgement missing, guaranteed delivery for subscriber 【发布时间】:2017-10-29 09:07:10 【问题描述】:各位开发者,
我正在查看 Paho MQTT 客户端(用于 Python),我想我了解发布者的 QoS 设置,例如传感器或任何数据源确实有意义 - 毕竟,您希望能够确保(由代理/服务器)接收到消息。
我也认为从订阅者的角度来看请求是有意义的,例如QoS“2”以确保 MQTT 服务器确实将每条消息发送给订阅者,但我正在为此苦苦挣扎:订阅者似乎没有办法发出成功(或不)处理接收到的消息的信号消息,换句话说,似乎缺少某种显式确认的方式?
用例 - 成功处理消息
我想订阅一个主题并成功处理每个数据点,例如通过存储到数据库。因此,我需要适应订户“进行中”失败的情况,即在从代理收到消息之后但在成功处理(存储到数据库)之前。
假设
现在,如果订阅者(固定 client_id)在处理数据时失败,它将重新启动,然后重新连接到 MQTT 代理,代理通过 id 识别此特定客户端并再次开始推送消息 - 从订阅者断开连接后的下一条消息开始- 就代理而言,它确实成功地传递了最后一条消息(不知道订阅者崩溃了)。
可能的解决方案
如果上述假设成立,那么我最好不要使用固定的client_id,而是使用“clean_session”和随机的client_id;通过这种方式,代理开始传递针对特定主题的所有消息。这当然会将跟踪成功处理的消息的责任放在订阅者身上。
这是需要做的吗?或者有没有办法明确地确认订阅者成功处理了一条消息,所以代理可以重新传输应该需要 - 我对 Paho Python 库特别感兴趣。
提前致谢!
编辑 1:
相关代码:
def _handle_on_message(self, message):
matched = False
with self._callback_mutex:
try:
topic = message.topic
except UnicodeDecodeError:
topic = None
if topic is not None:
for callback in self._on_message_filtered.iter_match(message.topic):
with self._in_callback:
callback(self, self._userdata, message)
matched = True
if matched == False and self.on_message:
with self._in_callback:
self.on_message(self, self._userdata, message)
来源:https://github.com/eclipse/paho.mqtt.python/blob/v1.3.1/src/paho/mqtt/client.py#L2631
编辑 2:
@hardlib 确实是正确的——在回调函数内部失败时,客户端代码不会向代理确认
一些用于说明的代码
on_message
订阅者回调
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload)+ " mid:" + str(msg.mid))
num = ''.join(x for x in str(msg.payload) if x.isdigit())
if int(num) % 3 == 0:
print("Going to show myself out now (sys.exit(1))")
time.sleep(1)
sys.exit(1)
出版商
while True:
count += 1
logging.debug("At: " + str(count))
msg = "message: counter".format(counter=count)
mqttc.publish("paho/stacko", msg, qos=2, retain=False)
日志
出版商
root@14f00c2576b2:/usr/src/app# python publisher.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
DEBUG:root:At: 1
DEBUG:root:Sending PUBLISH (d0, q2, r0, m1), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received CONNACK (0, 0)
DEBUG:root:Received PUBREC (Mid: 1)
DEBUG:root:Sending PUBREL (Mid: 1)
DEBUG:root:Received PUBCOMP (Mid: 1)
DEBUG:root:At: 2
DEBUG:root:Sending PUBLISH (d0, q2, r0, m2), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 2)
DEBUG:root:Sending PUBREL (Mid: 2)
DEBUG:root:Received PUBCOMP (Mid: 2)
DEBUG:root:At: 3
DEBUG:root:Sending PUBLISH (d0, q2, r0, m3), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 3)
DEBUG:root:Sending PUBREL (Mid: 3)
DEBUG:root:Received PUBCOMP (Mid: 3)
DEBUG:root:At: 4
DEBUG:root:Sending PUBLISH (d0, q2, r0, m4), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 4)
DEBUG:root:Sending PUBREL (Mid: 4)
DEBUG:root:Received PUBCOMP (Mid: 4)
DEBUG:root:At: 5
DEBUG:root:Sending PUBLISH (d0, q2, r0, m5), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 5)
DEBUG:root:Sending PUBREL (Mid: 5)
DEBUG:root:Received PUBCOMP (Mid: 5)
DEBUG:root:At: 6
DEBUG:root:Sending PUBLISH (d0, q2, r0, m6), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 6)
DEBUG:root:Sending PUBREL (Mid: 6)
DEBUG:root:Received PUBCOMP (Mid: 6)
DEBUG:root:At: 7
DEBUG:root:Sending PUBLISH (d0, q2, r0, m7), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 7)
DEBUG:root:Sending PUBREL (Mid: 7)
DEBUG:root:Received PUBCOMP (Mid: 7)
DEBUG:root:At: 8
DEBUG:root:Sending PUBLISH (d0, q2, r0, m8), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 8)
DEBUG:root:Sending PUBREL (Mid: 8)
DEBUG:root:Received PUBCOMP (Mid: 8)
DEBUG:root:At: 9
DEBUG:root:Sending PUBLISH (d0, q2, r0, m9), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 9)
DEBUG:root:Sending PUBREL (Mid: 9)
DEBUG:root:Received PUBCOMP (Mid: 9)
DEBUG:root:At: 10
DEBUG:root:Sending PUBLISH (d0, q2, r0, m10), 'b'paho/stacko'', ... (11 bytes)
DEBUG:root:Received PUBREC (Mid: 10)
DEBUG:root:Sending PUBREL (Mid: 10)
DEBUG:root:Received PUBCOMP (Mid: 10)
DEBUG:root:At: 11
DEBUG:root:Sending PUBLISH (d0, q2, r0, m11), 'b'paho/stacko'', ... (11 bytes)
DEBUG:root:Received PUBREC (Mid: 11)
DEBUG:root:Sending PUBREL (Mid: 11)
DEBUG:root:Received PUBCOMP (Mid: 11)
订阅者
root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (0, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBLISH (d0, q2, r0, m1), 'paho/stacko', ... (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 1)
DEBUG:root:Received PUBREL (Mid: 1)
DEBUG:root:Message!!!! b'message: 2'
paho/stacko b'message: 2' mid:1
Num: 2
DEBUG:root:Sending PUBCOMP (Mid: 1)
DEBUG:root:Received PUBLISH (d0, q2, r0, m2), 'paho/stacko', ... (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 2)
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Message!!!! b'message: 3'
paho/stacko b'message: 3' mid:2
Num: 3
Going to show myself out now (sys.exit(1))
root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (1, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBLISH (d0, q2, r0, m3), 'paho/stacko', ... (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 3)
DEBUG:root:Received PUBLISH (d0, q2, r0, m4), 'paho/stacko', ... (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 4)
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBREL (Mid: 3)
DEBUG:root:Message!!!! b'message: 4'
paho/stacko b'message: 4' mid:3
Num: 4
DEBUG:root:Sending PUBCOMP (Mid: 3)
DEBUG:root:Received PUBREL (Mid: 4)
DEBUG:root:Message!!!! b'message: 5'
paho/stacko b'message: 5' mid:4
Num: 5
DEBUG:root:Sending PUBCOMP (Mid: 4)
DEBUG:root:Received PUBLISH (d0, q2, r0, m5), 'paho/stacko', ... (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 5)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Message!!!! b'message: 6'
paho/stacko b'message: 6' mid:5
Num: 6
Going to show myself out now (sys.exit(1))
root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (1, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBLISH (d0, q2, r0, m6), 'paho/stacko', ... (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 6)
DEBUG:root:Received PUBREL (Mid: 6)
DEBUG:root:Message!!!! b'message: 7'
paho/stacko b'message: 7' mid:6
Num: 7
DEBUG:root:Sending PUBCOMP (Mid: 6)
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBLISH (d0, q2, r0, m7), 'paho/stacko', ... (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 7)
DEBUG:root:Received PUBREL (Mid: 7)
DEBUG:root:Message!!!! b'message: 8'
paho/stacko b'message: 8' mid:7
Num: 8
DEBUG:root:Sending PUBCOMP (Mid: 7)
DEBUG:root:Received PUBLISH (d0, q2, r0, m8), 'paho/stacko', ... (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 8)
DEBUG:root:Received PUBREL (Mid: 8)
DEBUG:root:Message!!!! b'message: 9'
paho/stacko b'message: 9' mid:8
Num: 9
Going to show myself out now (sys.exit(1))
root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (1, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Received PUBREL (Mid: 8)
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBLISH (d0, q2, r0, m9), 'paho/stacko', ... (11 bytes)
DEBUG:root:Sending PUBREC (Mid: 9)
DEBUG:root:Received PUBREL (Mid: 9)
DEBUG:root:Message!!!! b'message: 10'
paho/stacko b'message: 10' mid:9
Num: 10
DEBUG:root:Sending PUBCOMP (Mid: 9)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Received PUBLISH (d0, q2, r0, m10), 'paho/stacko', ... (11 bytes)
DEBUG:root:Sending PUBREC (Mid: 10)
DEBUG:root:Received PUBREL (Mid: 10)
DEBUG:root:Message!!!! b'message: 11'
paho/stacko b'message: 11' mid:10
Num: 11
DEBUG:root:Sending PUBCOMP (Mid: 10)
DEBUG:root:Received PUBREL (Mid: 2)
结论(暂时)
MQTT(至少 Mosquitto)在持久性方面确实按预期工作:如果客户端重新连接,它可以“追赶”自上次连接以来错过的那些消息。但是,即使为订阅者和发布者都设置了qos=2
,崩溃前的最后一条消息也不会被重新处理
【问题讨论】:
您的假设是错误的,使用随机客户端 ID,代理不会向“新”客户端发送任何旧消息,因为它没有任何迹象表明它应该为该客户端保留消息 【参考方案1】:如果您的客户端在处理消息时可能会失败,您可以在开始处理之前将消息存储在某个地方(可能在磁盘或数据库中)。
然后,您可以在客户端重新启动时检查此存储并尝试再次处理它。如果您在重新连接之前执行此操作(使用固定的 client_id),那么您在尝试处理失败消息时不必担心新消息会被传递。
编辑:
还要查看代码的更详细信息,对于 QOS2,交付确认的最后一段似乎仅在 on_message
完成后发送,因此如果您在处理 on_message
中的消息时崩溃,那么代理应该重新传递该消息。
https://github.com/eclipse/paho.mqtt.python/blob/e9914a759f9f5b8081d59fd65edfd18d229a399e/src/paho/mqtt/client.py#L2506
【讨论】:
re:“在某处存储消息” - 不确定这有什么帮助?问题仍然存在:一旦将消息传递给订阅者,发布者就会认为其工作已完成并继续 re: "confirmation sent" - 您链接到的代码是客户端发布到代理的部分 - 我更关心客户端订阅的部分到一个主题并接收来自服务器的消息 我不明白你的第一条评论,一旦调用on_message
,订阅者就应该将消息存储在本地(到磁盘),直到它处理完它才能在重启时恢复。
工作备份堆栈,链接的代码处理传入消息未发布
@hardlib - 你是对的,事实上是 2 倍!! 1. 尽管名称为handle_publish
- MQTT Paho 客户端的这一部分是关于处理消息的接收端。 2.确实,当on_message
回调失败时,客户端不会发出最终确认(在qos=2的情况下)以上是关于PAHO MQTT Python 客户端 - 缺少确认,保证为订阅者交付的主要内容,如果未能解决你的问题,请参考以下文章
python paho mqtt客户端连接通过ssl/tls给出错误