使用 Go 从 Google Pub Sub 读取所有可用消息

Posted

技术标签:

【中文标题】使用 Go 从 Google Pub Sub 读取所有可用消息【英文标题】:Reading all the available messages from Google Pub Sub using Go 【发布时间】:2021-09-28 11:53:27 【问题描述】:

我正在尝试从 google pub-sub 中的某个主题获取所有可用消息。 但是在 go 中,一旦 Pub-Sub 中没有剩余消息,我无法找到可以取消接收回调的配置。

我认为一种方法是使用此答案Google PubSub - Counting messages in topic 中描述的 Google Cloud Monitoring Api 从 Pub-Sub 获取消息总数,然后记录已读取消息的数量并在计数为时调用取消等于这个数字,但我不确定这是否是前进的正确方法。

    var mu sync.Mutex
    received := 0
    sub := client.Subscription(subID)
    cctx, cancel := context.WithCancel(ctx)
    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) 
            mu.Lock()
            defer mu.Unlock()
            fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
            msg.Ack()
            received++
            if received == TotalNumberOfMessages 
                    cancel()
            
    )
    if err != nil 
            return fmt.Errorf("Receive: %v", err)
    

我也尝试过使用带超时的上下文,即在取消之后获取直到未满足此上下文截止日期。

ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) 

但话又说回来,我无法确定所有消息都已处理完毕。

请提出一个解决方案,以确保订阅功能在 Pub-Sub 中没有剩余消息时停止。

【问题讨论】:

【参考方案1】:

我已经在我以前的公司实现了它(遗憾的是我不再有代码,它在我以前的公司 git...)。但是它起作用了。

原理如下

msg := make(chan *pubsub.Message, 1)
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
go sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) 
    msg <- m
    )
for 
  select 
    case res := <-msg:
      fmt.Fprintf(w, "Got message: %q\n", string(res.Data))
      res.Ack()
  
    case <-time.After(3 * time.Second):
        fmt.Println("timeout")
        cancel()
    

【讨论】:

以上是关于使用 Go 从 Google Pub Sub 读取所有可用消息的主要内容,如果未能解决你的问题,请参考以下文章

我怎样才能使用 Python 从 Google Pub/Sub 中以足够快的速度阅读

Google Pub/Sub 如何在 Pull 上设置读取超时

确认 Apache Beam 上的 Google Pub/Sub 消息

Google Pub/Sub 的 RetryPolicy 中配置的指数退避如何工作?

Google Cloud Pub/Sub 中的积压工作

Google Cloud 上使用 Pub/Sub 的主/从模式