mqtt 协议中订阅者的反馈消息
Posted
技术标签:
【中文标题】mqtt 协议中订阅者的反馈消息【英文标题】:Feedback message from subscriber in mqtt protocol 【发布时间】:2021-03-04 23:34:41 【问题描述】:我使用 MQTT 协议在两台计算机之间发送消息。我已经从这段代码中得到了模式。 发布者:
import paho.mqtt.client as mqtt
from random import randrange, uniform
import time
mqttBroker ="mqtt.eclipse.org"
client = mqtt.Client("Temperature_Inside")
client.connect(mqttBroker)
while True:
randNumber = randrange(10)
client.publish("TEMPERATURE", randNumber)
print("Just published " + str(randNumber) + " to topic TEMPERATURE")
time.sleep(1)
订阅者:
import paho.mqtt.client as mqtt
import time
def on_message(client, userdata, message):
print("received message: " ,str(message.payload.decode("utf-8")))
mqttBroker ="mqtt.eclipse.org"
client = mqtt.Client("Smartphone")
client.connect(mqttBroker)
client.loop_start()
client.subscribe("TEMPERATURE")
client.on_message=on_message
time.sleep(1)
client.loop_stop()
我希望在收到消息时向发布者发送反馈。有没有办法获得消息反馈?
【问题讨论】:
当你说反馈时,你到底是什么意思。您想要确认订阅者已收到消息,还是想要基于订阅者解析消息的具体反馈?我相信,如果您只是想确认它到达接收器,您可以使用 QoS(服务质量)和 mqtt 来做到这一点。否则我想你将不得不让发布者订阅一个单独的频道,订阅者在完成后在该频道上发布 ????????正是我想要确认订阅者已收到消息。 好的,我留下了一个答案来展示你如何做到这一点 【参考方案1】:MQTT 协议中没有端到端的传递通知。这是很刻意的。
MQTT 是一个发布/订阅系统,旨在将信息的生产者与消费者完全分开。当生产者发布消息时,可能有 0 到无限数量的主题订阅者。也可能有离线订阅者会在他们重新连接时传递消息(可能是在消息发布后的任何时间)
MQTT 提供的是 QOS 级别,但重要的是要记住,这些级别仅适用于交付过程的单程。例如。在 QOS 2 发布的消息确保它会到达代理,但不保证任何订阅者的订阅可能处于 QOS 0。
如果您的系统需要端到端传送通知,那么您需要自己实现响应消息,这通常是晚餐,通过在初始消息中包含唯一 ID 并在也包含该 ID 的不同主题中发送单独的响应消息
【讨论】:
【参考方案2】:为确保您的消息能够送达,您可以使用 QoS。这可以在发布或订阅时设置。因此,对于您的情况,您将需要 QoS 1 或 2。QoS 2 确保它在发布时恰好到达代理一次,并且如果以 QoS 2 订阅,它将确保订阅者仅获得一次消息。请注意,尽管 QoS 2 是最慢的发布和订阅形式。我发现处理消息最常见的方法是使用 QoS 1,然后在您的订阅 on_message 中您可以确定如何自己处理重复消息。 paho mqtt 客户端允许您在发布或订阅时设置 QoS,但默认为 0。我在下面更新了您的代码。
# publisher
import paho.mqtt.client as mqtt
from random import randrange, uniform
import time
import json
mqttBroker ="mqtt.eclipse.org"
client = mqtt.Client("Temperature_Inside")
client.connect(mqttBroker)
id = 1
while True:
randNumber = randrange(10)
message_dict = 'id': id, 'value': randNumber
client.publish("TEMPERATURE", json.dumps(message_dict), 1)
print("Just published " + str(randNumber) + " to topic TEMPERATURE")
id += 1
time.sleep(1)
# subscriber
import paho.mqtt.client as mqtt
import time
import json
from datetime import datetime
parsed_messages =
def on_message(client, userdata, message):
json_body = json.loads(str(message.payload.decode("utf-8")))
msg_id = json_body['id']
if msg_id in parsed_messages.keys
print("Message already recieved at: ", parsed_messages[msg_id].strftime("%H:%M:%S"))
else
print("received message: " ,json_body['value'])
parsed_messages[msg_key] = datetime.now()
mqttBroker ="mqtt.eclipse.org"
client = mqtt.Client("Smartphone")
client.connect(mqttBroker)
client.loop_start()
client.subscribe("TEMPERATURE", 1)
client.on_message=on_message
time.sleep(1)
client.loop_stop()
请注意,订阅者在订阅时还要定义 QoS 1,否则它将使用 paho 客户端的默认 QoS 0 进行订阅,并且消息将降级为 0,这意味着消息将最多传递一次(但如果数据包丢失,可能根本无法送达)。如上所述,仅确保消息将被订阅者接收。如果您希望发布者在订阅者处理完消息时收到通知,您将需要在订阅者完成发布者可以订阅的处理后发布一个新主题(带有一些 uuid)。然而,当我看到这样做时,我经常质疑为什么要使用 MQTT,而不仅仅是发送 HTTP 请求。如果您有兴趣,Here 是关于 MQTT QoS 的一个很好的链接(尽管它缺乏关于订阅方发生的事情的详细信息)。
【讨论】:
以上是关于mqtt 协议中订阅者的反馈消息的主要内容,如果未能解决你的问题,请参考以下文章