Pubsub.pull 请求无法正常工作 - 去吧
Posted
技术标签:
【中文标题】Pubsub.pull 请求无法正常工作 - 去吧【英文标题】:Pubsub.pull request does not working properly - go 【发布时间】:2020-02-17 10:19:36 【问题描述】:我正在尝试使用 go 客户端库一次从 pub-sub 订阅中提取 1 条消息。但是即使订阅中存在消息,消息也不会拉取请求请求。订阅者正在等待处理所有消息。
我正在尝试使用基本代码,一次提取一条消息。我使用了两个实例,并在两个实例的后台运行脚本(创建订阅者)4 次。我已将 ack_deadline 设置为 10 秒。 我期待的结果是每个订阅者都应该在一条消息确认后从订阅中获取下一条消息。但是在最后一条消息处理完成之前,消息不会拉入实例。 为什么在一条消息处理完成后没有拉取消息?据我所知,不应该对实例或订阅者有任何依赖。 让 mi 知道需要设置的任何其他更改或参数。 提前致谢。
这是一个实例的日志:
2019/10/21 05:22:07 Got message: Message 0 at 2019-10-21 05:22:07.022772532
2019/10/21 05:22:11 Got message: Message 1 at 2019-10-21 05:22:11.330566981
2019/10/21 05:22:14 Got message: Message 2 at 2019-10-21 05:22:14.803031569
2019/10/21 05:22:18 Got message: Message 3 at 2019-10-21 05:22:18.452912271
2019/10/21 05:38:39 Acking message: Message 3 at 2019-10-21 05:38:39.471739478
2019/10/21 05:39:10 Acking message: Message 0 at 2019-10-21 05:39:10.039336794
2019/10/21 05:41:22 Acking message: Message 1 at 2019-10-21 05:41:22.351124342
2019/10/21 05:50:31 Acking message: Message 2 at 2019-10-21 05:50:31.829087762
2019/10/21 05:50:39 Got message: Message 13 at 2019-10-21 05:50:39.005916608
2019/10/21 05:50:39 Got message: Message 11 at 2019-10-21 05:50:39.00623238
2019/10/21 05:50:39 Got message: Message 15 at 2019-10-21 05:50:39.007216256
2019/10/21 05:50:39 Got message: Message 12 at 2019-10-21 05:50:39.008066257
第二个实例的日志:
2019/10/21 05:22:29 Got message: Message 4 at 2019-10-21 05:22:29.331569077
2019/10/21 05:22:33 Got message: Message 5 at 2019-10-21 05:22:33.018801275
2019/10/21 05:22:36 Got message: Message 6 at 2019-10-21 05:22:36.803434547
2019/10/21 05:22:40 Got message: Message 7 at 2019-10-21 05:22:40.409314927
2019/10/21 05:39:38 Acking message: Message 4 at 2019-10-21 05:39:38.349619635
2019/10/21 05:42:42 Acking message: Message 6 at 2019-10-21 05:42:42.819874065
2019/10/21 05:47:40 Acking message: Message 5 at 2019-10-21 05:47:40.049128075
2019/10/21 05:50:38 Acking message: Message 7 at 2019-10-21 05:50:38.42874031
2019/10/21 05:50:39 Got message: Message 8 at 2019-10-21 05:50:39.005090906
2019/10/21 05:50:39 Got message: Message 9 at 2019-10-21 05:50:39.005334146
2019/10/21 05:50:39 Got message: Message 16 at 2019-10-21 05:50:39.006427796
2019/10/21 05:50:39 Got message: Message 14 at 2019-10-21 05:50:39.007231713
package main
// [START pubsub_publish_with_error_handling_that_scales]
import (
"context"
"fmt"
"os"
"log"
"time"
"math/rand"
pubsub "cloud.google.com/go/pubsub/apiv1"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
)
func main()
f, _:= os.OpenFile("testlogfile", os.O_RDWR | os.O_CREATE | os.O_APPEND, 0666)
defer f.Close()
log.SetOutput(f)
rand.Seed(time.Now().UTC().UnixNano())
pullMsgs("sureline-dev-1264", "sub7")
func random(min, max int) int
return rand.Intn(max - min) + min
func pullMsgs(projectID, subscriptionID string) error
ctx := context.Background()
client, err := pubsub.NewSubscriberClient(ctx)
if err != nil
log.Fatal(err)
defer client.Close()
sub := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subscriptionID)
// Be sure to tune the MaxMessages parameter per your project's needs, and accordingly
// adjust the ack behavior below to batch acknowledgements.
req := pubsubpb.PullRequest
Subscription: sub,
MaxMessages: 1,
fmt.Println("Listening..")
for
res, err := client.Pull(ctx, &req)
if err != nil
log.Fatal(err)
// client.Pull returns an empty list if there are no messages available in the
// backlog. We should skip processing steps when that happens.
if len(res.ReceivedMessages) == 0
continue
var recvdAckIDs []string
for _, m := range res.ReceivedMessages
recvdAckIDs = append(recvdAckIDs, m.AckId)
var done = make(chan struct)
var delay = 0 * time.Second // Tick immediately upon reception
var ackDeadline = 10 * time.Second
// Continuously notify the server that processing is still happening on this batch.
go func()
for
select
case <-ctx.Done():
return
case <-done:
return
case <-time.After(delay):
err := client.ModifyAckDeadline(ctx, &pubsubpb.ModifyAckDeadlineRequest
Subscription: sub,
AckIds: recvdAckIDs,
AckDeadlineSeconds: int32(ackDeadline.Seconds()),
)
if err != nil
log.Fatal(err)
delay = ackDeadline - 5*time.Second // 5 seconds grace period.
()
for _, m := range res.ReceivedMessages
// Process the message here, possibly in a goroutine.
log.Printf("Got message: %s at %v", string(m.Message.Data), time.Now())
fmt.Printf("Got message: %s at %v", string(m.Message.Data), time.Now())
myrand := random(240, 420)
log.Printf("Sleeping %d seconds...\n", myrand)
time.Sleep(time.Duration(myrand)*time.Second)
err := client.Acknowledge(ctx, &pubsubpb.AcknowledgeRequest
Subscription: sub,
AckIds: []stringm.AckId,
)
log.Printf("Acking message: %s at %v", string(m.Message.Data), time.Now())
fmt.Printf("Acking message: %s at %v", string(m.Message.Data), time.Now())
if err != nil
log.Fatal(err)
close(done)
我希望输出类似于在完成第一条消息处理后从订阅中获取下一条消息。它不应该依赖于任何其他实例。
【问题讨论】:
我删除了我的答案,我没有很好地理解你的问题,我没有很好地看到日志时间条目。 【参考方案1】:尝试以这种方式一次提取一条消息是 Cloud Pub/Sub 的反模式。在您的情况下,您的订阅者可能最终与不同的服务器进行通信,这些服务器分配了要发送给连接到它们的订阅者的消息。 Cloud Pub/Sub 期望同时从使用此方法接收消息的客户端接收多个拉取请求。因此,您应该同时处理多个未完成的拉取请求,或者您应该使用asynchronous pull via the Receive method。
【讨论】:
以上是关于Pubsub.pull 请求无法正常工作 - 去吧的主要内容,如果未能解决你的问题,请参考以下文章
如何通过 terraform 使用服务帐户创建 google cloud pubsub pull 订阅?