Go MQTT 客户端在收到大约 2400 条消息后停止监听新消息

Posted

技术标签:

【中文标题】Go MQTT 客户端在收到大约 2400 条消息后停止监听新消息【英文标题】:Go MQTT client stops listening new messages after receiving around 2400 messages 【发布时间】:2022-01-21 07:32:03 【问题描述】:

我有一个监听代理的 Paho MQTT GO 客户端。在大约 2400-2500 条消息之后,它会停止收听新消息。有趣的是,它并没有与经纪人断开连接。似乎它仍然处于活动状态并正在收听,但不再出现新消息。

下面是我的代码-

package main

import (
    "flag"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) 
    fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())


var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) 
    fmt.Println("Connected")


var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) 
    fmt.Printf("Connect lost: %v", err)



func createClient() mqtt.Client 
    var broker = "*******"
    var port = 1883
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    opts.SetClientID("go_mqtt_client_test1")
    opts.SetDefaultPublishHandler(messagePubHandler)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    return mqtt.NewClient(opts)


func main() 

    var client = createClient()
    if token := client.Connect(); token.Wait() && token.Error() != nil 
        panic(token.Error())
    

    // load command line arguments if any
    name := flag.String("name", "world", "name to print")
    flag.Parse()

    log.Printf("Starting sleepservice for %s", *name)

    // setup signal catching
    sigs := make(chan os.Signal, 1)

    // catch all signals since not explicitly listing
    //signal.Notify(sigs)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

    // method invoked upon seeing signal
    go func() 
        s := <-sigs
        log.Printf("RECEIVED SIGNAL: %s", s)

        switch s 
        case syscall.SIGINT:
            AppCleanup(client)
            os.Exit(1)
        case syscall.SIGTERM:
            AppCleanup(client)
            os.Exit(1)
        case syscall.SIGQUIT:
            AppCleanup(client)
            os.Exit(1)
        default:
            log.Printf("not supported Signal")
        

    ()

    sub(client)

    for  /* Endless Loop */
    



func AppCleanup(client mqtt.Client) 
    client.Disconnect(250)
    log.Println("CLEANUP APP BEFORE EXIT!!!")


func sub(client mqtt.Client) 
    topic := "test/topic"
    token := client.Subscribe(topic, 1, nil)
    token.Wait()
    fmt.Printf("Subscribed to topic: %s", topic)



这里,我隐藏了broker的IP

非常感谢您的帮助。提前致谢。

【问题讨论】:

使用无限循环不是一个好策略 - 请参阅 this demo 了解替代方法。您能否分享有关您的经纪人的信息(最好是经纪人日志)。另请确认您使用的库版本。 【参考方案1】:

最后我发现了问题,是最后一个无限循环导致了问题。我删除了它,现在问题似乎解决了。谢谢@asad 和@Brits

【讨论】:

我想我应该注意,虽然无限的无操作循环是一个问题(它不必要地使用资源),但我不希望它本身会导致这样的症状。 paho 客户端运行许多 goroutines,过去已经发现/解决了死锁情况。我怀疑引入无限循环揭示了另一个通常隐藏的问题(不幸的是,诊断这种事情可能很棘手!)。很高兴它为你工作(我有客户使用这个库已经一年多了,所以它可以稳定,但我不能说它没有错误!)。

以上是关于Go MQTT 客户端在收到大约 2400 条消息后停止监听新消息的主要内容,如果未能解决你的问题,请参考以下文章

Python:MQTT代理消息批量插入mysql数据库

前几条消息在传输到离线的 mqtt 客户端时丢失

Android APP必备高级功能,消息推送之MQTT

mqtt:避免缓冲消息

如何在Paho中获取最后五条消息?

2019-07-10-mqtt-mosquitto系列13之共享主题