Pubsub.pull 请求无法正常工作 - 去吧

Posted

技术标签:

【中文标题】Pubsub.pull 请求无法正常工作 - 去吧【英文标题】:Pubsub.pull request does not working properly - go 【发布时间】:2020-02-17 10:19:36 【问题描述】:

我正在尝试使用 go 客户端库一次从 pub-sub 订阅中提取 1 条消息。但是即使订阅中存在消息,消息也不会拉取请求请求。订阅者正在等待处理所有消息。

我正在尝试使用基本代码,一次提取一条消息。我使用了两个实例,并在两个实例的后台运行脚本(创建订阅者)4 次。我已将 ack_deadline 设置为 10 秒。 我期待的结果是每个订阅者都应该在一条消息确认后从订阅中获取下一条消息。但是在最后一条消息处理完成之前,消息不会拉入实例。 为什么在一条消息处理完成后没有拉取消息?据我所知,不应该对实例或订阅者有任何依赖。 让 mi 知道需要设置的任何其他更改或参数。 提前致谢。

这是一个实例的日志:

2019/10/21 05:22:07 Got message: Message 0 at 2019-10-21 05:22:07.022772532 
2019/10/21 05:22:11 Got message: Message 1 at 2019-10-21 05:22:11.330566981 
2019/10/21 05:22:14 Got message: Message 2 at 2019-10-21 05:22:14.803031569 
2019/10/21 05:22:18 Got message: Message 3 at 2019-10-21 05:22:18.452912271 
2019/10/21 05:38:39 Acking message: Message 3 at 2019-10-21 05:38:39.471739478 
2019/10/21 05:39:10 Acking message: Message 0 at 2019-10-21 05:39:10.039336794 
2019/10/21 05:41:22 Acking message: Message 1 at 2019-10-21 05:41:22.351124342 
2019/10/21 05:50:31 Acking message: Message 2 at 2019-10-21 05:50:31.829087762 
2019/10/21 05:50:39 Got message: Message 13 at 2019-10-21 05:50:39.005916608
2019/10/21 05:50:39 Got message: Message 11 at 2019-10-21 05:50:39.00623238 
2019/10/21 05:50:39 Got message: Message 15 at 2019-10-21 05:50:39.007216256
2019/10/21 05:50:39 Got message: Message 12 at 2019-10-21 05:50:39.008066257 

第二个实例的日志:

2019/10/21 05:22:29 Got message: Message 4 at 2019-10-21 05:22:29.331569077 
2019/10/21 05:22:33 Got message: Message 5 at 2019-10-21 05:22:33.018801275 
2019/10/21 05:22:36 Got message: Message 6 at 2019-10-21 05:22:36.803434547 
2019/10/21 05:22:40 Got message: Message 7 at 2019-10-21 05:22:40.409314927 
2019/10/21 05:39:38 Acking message: Message 4 at 2019-10-21 05:39:38.349619635 
2019/10/21 05:42:42 Acking message: Message 6 at 2019-10-21 05:42:42.819874065 
2019/10/21 05:47:40 Acking message: Message 5 at 2019-10-21 05:47:40.049128075 
2019/10/21 05:50:38 Acking message: Message 7 at 2019-10-21 05:50:38.42874031 
2019/10/21 05:50:39 Got message: Message 8 at 2019-10-21 05:50:39.005090906 
2019/10/21 05:50:39 Got message: Message 9 at 2019-10-21 05:50:39.005334146 
2019/10/21 05:50:39 Got message: Message 16 at 2019-10-21 05:50:39.006427796 
2019/10/21 05:50:39 Got message: Message 14 at 2019-10-21 05:50:39.007231713 
package main
// [START pubsub_publish_with_error_handling_that_scales]
import (
    "context"
    "fmt"
    "os"
    "log"
    "time"
    "math/rand"
    pubsub "cloud.google.com/go/pubsub/apiv1"
    pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
)


func main()
    f, _:= os.OpenFile("testlogfile", os.O_RDWR | os.O_CREATE | os.O_APPEND, 0666)
    defer f.Close()
    log.SetOutput(f)
    rand.Seed(time.Now().UTC().UnixNano())
    pullMsgs("sureline-dev-1264", "sub7")


func random(min, max int) int 
    return rand.Intn(max - min) + min


func pullMsgs(projectID, subscriptionID string) error 
    ctx := context.Background()
    client, err := pubsub.NewSubscriberClient(ctx)
    if err != nil 
        log.Fatal(err)
    
    defer client.Close()
    sub := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subscriptionID)
// Be sure to tune the MaxMessages parameter per your project's needs, and accordingly
// adjust the ack behavior below to batch acknowledgements.
    req := pubsubpb.PullRequest
        Subscription: sub,
        MaxMessages:  1,
    

    fmt.Println("Listening..")

    for 
        res, err := client.Pull(ctx, &req)
        if err != nil 
            log.Fatal(err)
        

    // client.Pull returns an empty list if there are no messages available in the
    // backlog. We should skip processing steps when that happens.
        if len(res.ReceivedMessages) == 0 
            continue
        

        var recvdAckIDs []string
        for _, m := range res.ReceivedMessages 
            recvdAckIDs = append(recvdAckIDs, m.AckId)
        

        var done = make(chan struct)
        var delay = 0 * time.Second // Tick immediately upon reception
        var ackDeadline = 10 * time.Second

    // Continuously notify the server that processing is still happening on this batch.
        go func() 
            for 
                select 
                case <-ctx.Done():
                    return
                case <-done:
                    return
                case <-time.After(delay):
                    err := client.ModifyAckDeadline(ctx, &pubsubpb.ModifyAckDeadlineRequest
                        Subscription:       sub,
                        AckIds:             recvdAckIDs,
                        AckDeadlineSeconds: int32(ackDeadline.Seconds()),
                    )
                    if err != nil 
                        log.Fatal(err)
                    
                    delay = ackDeadline - 5*time.Second // 5 seconds grace period.
                
            
        ()

        for _, m := range res.ReceivedMessages 
            // Process the message here, possibly in a goroutine.
            log.Printf("Got message: %s at %v", string(m.Message.Data), time.Now())
            fmt.Printf("Got message: %s at %v", string(m.Message.Data), time.Now())
            myrand := random(240, 420)
            log.Printf("Sleeping %d seconds...\n", myrand)
            time.Sleep(time.Duration(myrand)*time.Second)
            err := client.Acknowledge(ctx, &pubsubpb.AcknowledgeRequest
                Subscription: sub,
                AckIds:       []stringm.AckId,
            )
            log.Printf("Acking message: %s at %v", string(m.Message.Data), time.Now())
            fmt.Printf("Acking message: %s at %v", string(m.Message.Data), time.Now())
            if err != nil 
                log.Fatal(err)
            
        

        close(done)
    

我希望输出类似于在完成第一条消息处理后从订阅中获取下一条消息。它不应该依赖于任何其他实例。

【问题讨论】:

我删除了我的答案,我没有很好地理解你的问题,我没有很好地看到日志时间条目。 【参考方案1】:

尝试以这种方式一次提取一条消息是 Cloud Pub/Sub 的反模式。在您的情况下,您的订阅者可能最终与不同的服务器进行通信,这些服务器分配了要发送给连接到它们的订阅者的消息。 Cloud Pub/Sub 期望同时从使用此方法接收消息的客户端接收多个拉取请求。因此,您应该同时处理多个未完成的拉取请求,或者您应该使用asynchronous pull via the Receive method。

【讨论】:

以上是关于Pubsub.pull 请求无法正常工作 - 去吧的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 terraform 使用服务帐户创建 google cloud pubsub pull 订阅?

Gmail API 监视请求无法正常工作

Ajax 请求在 Django 中无法正常工作

Alamofire GET请求无法正常工作

jQuery AJAX 获取请求无法正常工作,返回值无法在控制台显示

Flutter HTTP 发布请求 JSON 处理无法正常工作