mqtt 协议中订阅者的反馈消息

Posted

技术标签:

【中文标题】mqtt 协议中订阅者的反馈消息【英文标题】:Feedback message from subscriber in mqtt protocol 【发布时间】:2021-03-04 23:34:41 【问题描述】:

我使用 MQTT 协议在两台计算机之间发送消息。我已经从这段代码中得到了模式。 发布者:

import paho.mqtt.client as mqtt
from random import randrange, uniform
import time

mqttBroker ="mqtt.eclipse.org" 

client = mqtt.Client("Temperature_Inside")
client.connect(mqttBroker) 

while True:
    randNumber = randrange(10)

    client.publish("TEMPERATURE", randNumber)
    print("Just published " + str(randNumber) + " to topic TEMPERATURE")
    time.sleep(1)

订阅者:

import paho.mqtt.client as mqtt
import time

def on_message(client, userdata, message):
    print("received message: " ,str(message.payload.decode("utf-8")))

mqttBroker ="mqtt.eclipse.org"

client = mqtt.Client("Smartphone")
client.connect(mqttBroker) 

client.loop_start()

client.subscribe("TEMPERATURE")
client.on_message=on_message 

time.sleep(1)
client.loop_stop()

我希望在收到消息时向发布者发送反馈。有没有办法获得消息反馈?

【问题讨论】:

当你说反馈时,你到底是什么意思。您想要确认订阅者已收到消息,还是想要基于订阅者解析消息的具体反馈?我相信,如果您只是想确认它到达接收器,您可以使用 QoS(服务质量)和 mqtt 来做到这一点。否则我想你将不得不让发布者订阅一个单独的频道,订阅者在完成后在该频道上发布 ????????正是我想要确认订阅者已收到消息。 好的,我留下了一个答案来展示你如何做到这一点 【参考方案1】:

MQTT 协议中没有端到端的传递通知。这是很刻意的。

MQTT 是一个发布/订阅系统,旨在将信息的生产者与消费者完全分开。当生产者发布消息时,可能有 0 到无限数量的主题订阅者。也可能有离线订阅者会在他们重新连接时传递消息(可能是在消息发布后的任何时间)

MQTT 提供的是 QOS 级别,但重要的是要记住,这些级别仅适用于交付过程的单程。例如。在 QOS 2 发布的消息确保它会到达代理,但不保证任何订阅者的订阅可能处于 QOS 0。

如果您的系统需要端到端传送通知,那么您需要自己实现响应消息,这通常是晚餐,通过在初始消息中包含唯一 ID 并在也包含该 ID 的不同主题中发送单独的响应消息

【讨论】:

【参考方案2】:

为确保您的消息能够送达,您可以使用 QoS。这可以在发布或订阅时设置。因此,对于您的情况,您将需要 QoS 1 或 2。QoS 2 确保它在发布时恰好到达代理一次,并且如果以 QoS 2 订阅,它将确保订阅者仅获得一次消息。请注意,尽管 QoS 2 是最慢的发布和订阅形式。我发现处理消息最常见的方法是使用 QoS 1,然后在您的订阅 on_message 中您可以确定如何自己处理重复消息。 paho mqtt 客户端允许您在发布或订阅时设置 QoS,但默认为 0。我在下面更新了您的代码。

# publisher
import paho.mqtt.client as mqtt
from random import randrange, uniform
import time
import json

mqttBroker ="mqtt.eclipse.org" 

client = mqtt.Client("Temperature_Inside")
client.connect(mqttBroker) 

id = 1
while True:
    randNumber = randrange(10)
    message_dict =  'id': id, 'value': randNumber 

    client.publish("TEMPERATURE", json.dumps(message_dict), 1)
    print("Just published " + str(randNumber) + " to topic TEMPERATURE")
    id += 1
    time.sleep(1)

# subscriber
import paho.mqtt.client as mqtt
import time
import json
from datetime import datetime

parsed_messages = 

def on_message(client, userdata, message):
    json_body = json.loads(str(message.payload.decode("utf-8")))
    msg_id = json_body['id']
    if msg_id in parsed_messages.keys
        print("Message already recieved at: ", parsed_messages[msg_id].strftime("%H:%M:%S"))
    else
        print("received message: " ,json_body['value'])
        parsed_messages[msg_key] = datetime.now()
    

mqttBroker ="mqtt.eclipse.org"

client = mqtt.Client("Smartphone")
client.connect(mqttBroker) 

client.loop_start()

client.subscribe("TEMPERATURE", 1)
client.on_message=on_message 

time.sleep(1)
client.loop_stop()

请注意,订阅者在订阅时还要定义 QoS 1,否则它将使用 paho 客户端的默认 QoS 0 进行订阅,并且消息将降级为 0,这意味着消息将最多传递一次(但如果数据包丢失,可能根本无法送达)。如上所述,仅确保消息将被订阅者接收。如果您希望发布者在订阅者处理完消息时收到通知,您将需要在订阅者完成发布者可以订阅的处理后发布一个新主题(带有一些 uuid)。然而,当我看到这样做时,我经常质疑为什么要使用 MQTT,而不仅仅是发送 HTTP 请求。如果您有兴趣,Here 是关于 MQTT QoS 的一个很好的链接(尽管它缺乏关于订阅方发生的事情的详细信息)。

【讨论】:

以上是关于mqtt 协议中订阅者的反馈消息的主要内容,如果未能解决你的问题,请参考以下文章

MQTT协议

MQTT协议

python使用MQTT协议发送订阅消息

通过 mosquitto 了解 MQTT协议

Android使用MQTT订阅及发布消息(初步了解Mqtt以及实现Android操作mqtt服务)

MQTT 协议学习:006-订阅主题