PubSub 不确认消息

Posted

技术标签:

【中文标题】PubSub 不确认消息【英文标题】:PubSub isn't acknowledging messages 【发布时间】:2020-08-08 01:08:00 【问题描述】:

我有一个 pubsub 订阅(除了 go-routines 的数量之外的所有默认设置都是 1000),并且由于某种原因,消息永远不会得到确认,因此会重新传递。重新投递需要 1 到 2 分钟。我在收到消息后不到 1 秒就打电话给message.Ack(),所以我不明白发生了什么。这不应该是因为应用程序和 pubsub 本身之间存在延迟,因为在向主题发布消息后,消息实际上会立即传递。

订阅的确认截止日期为 10 秒。我尝试将其增加到 120,但仍然出现相同的问题。我想不出这些消息没有被确认并因此被重新传递的任何原因。

参考代码:

if err := pubsubSubscription(client).Receive(ctx, func(lctx context.Context, message *pubsub.Message) 
    log.Println("Received message") // occurs < 1s after publishing
    ack := message.Ack  
    if err := adapters.Handle(conn, id, gatewayAddr, message.Data); err != nil 
        log.Println("Will nack message")
        ack = message.Nack // not reached (in this context/example)
        cancel()
    
    log.Println("Will ack message") // occurs ~200µs after message receipt
    ack()
); err != nil 
    return fmt.Errorf("unable to subscribe to PubSub messages: %s", err)

为了澄清,我只向该主题发布了 1 条消息,但该回调每 1 或 2 分钟无限调用一次。

编辑

仅当订阅接收设置中的 go-routines 数量设置为高于 runtime.NumCPU() 的数字时才会发生这种情况。这是预期的行为吗?如果是这样,这如何与 Kubernetes(我正在使用)一起工作?

EDIT 2 -- 要求复制完整代码

const (
    DefaultMaxOutstandingMessages = 1000000
    DefaultMaxOutstandingBytes    = 1e9
)

func SubscribeToTables(id int) error 
    var opts []option.ClientOption
    if sa := os.Getenv("SERVICE_ACCOUNT"); sa != "" 
        opts = append(opts, option.WithCredentialsJSON([]byte(sa)))
    

    ctx := context.Background()
    projectID := os.Getenv("PROJECT_ID")
    client, err := pubsub.NewClient(ctx, projectID, opts...)
    if err != nil 
        return fmt.Errorf("error creating GCP PubSub client: %s", err)
    

    cctx, cancel := context.WithCancel(ctx)
    go func() 
        qch := make(chan os.Signal)
        signal.Notify(qch, os.Interrupt, syscall.SIGTERM)
        <-qch
        cancel()
    ()

    mch := make(chan *pubsub.Message)
    gatewayAddr := os.Getenv("GATEWAY_ADDRESS")
    conn, err := adapters.GetGatewayConn(gatewayAddr)
    if err != nil 
        return fmt.Errorf("unable to connect to Gateway: %s", err)
    
    go func() 
        for 
            select 
            case message := <-mch:
                if err := adapters.Handle(conn, id, gatewayAddr, message.Data); err != nil 
                    cancel()
                    return
                
                message.Ack()
            case <-ctx.Done():
                return
            
        
    ()
    if err := pubsubSubscription(client).Receive(cctx, func(_ context.Context, message *pubsub.Message) 
        mch <- message
    ); err != nil 
        return fmt.Errorf("unable to subscribe to PubSub messages: %s", err)
    
    return nil


func pubsubSubscription(client *pubsub.Client) *pubsub.Subscription 
    sub := client.Subscription(os.Getenv("SUBSCRIPTION_ID"))
    sub.ReceiveSettings = pubsub.ReceiveSettings
        MaxExtension:       pubsub.DefaultReceiveSettings.MaxExtension,
        MaxExtensionPeriod: pubsub.DefaultReceiveSettings.MaxExtensionPeriod,
        MaxOutstandingMessages: parsePubSubReceiveSetting(
            "MAX_OUTSTANDING_MESSAGES",
            "max outstanding messages",
            DefaultMaxOutstandingMessages,
        ),
        MaxOutstandingBytes: parsePubSubReceiveSetting(
            "MAX_OUTSTANDING_BYTES",
            "max outstanding bytes",
            DefaultMaxOutstandingBytes,
        ),
        NumGoroutines: parsePubSubReceiveSetting( // if this is higher than runtimie.NumCPU(), the aforementioned issue occurs 
            "NUM_GO_ROUTINES",
            "Go-routines",
            1000, 
        ),
    
    return sub


func parsePubSubReceiveSetting(env, name string, defaultValue int) int 
    e := os.Getenv(env)
    i, err := strconv.Atoi(e)
    if err != nil 
        log.Printf("Unable to parse number of GCP PubSub %s. Can't parse '%s' as int", name, e)
        log.Printf("Using default number of %s (%d)", name, defaultValue)
        return defaultValue
    
    return i

【问题讨论】:

【参考方案1】:

我怀疑您退出代码太快了。您必须 cancel() 停止 Receive 循环并将数据刷新回 PubSub 的上下文。

尝试在ack() 之后添加cancel()

【讨论】:

取消整个订阅。显然我想保持接收功能运行;我只是不想收到重复的消息 在这种情况下,您可以重用code snippet in the documentation。您的接收函数只需将消息发布到通道中,并在通道上发送一个侧面 goroutine 等待值。您还可以在上下文中添加超时,以停止无限循环。 你能分享一个功能性和最小的代码来重现你的案例吗?

以上是关于PubSub 不确认消息的主要内容,如果未能解决你的问题,请参考以下文章

[已解决]Pubsub 推送订阅不确认消息

Firebase:我可以在 Firebase 云功能中“不确认”一条 PubSub 消息吗?

在 MessageReciever 之外确认 pubSub 消息

Google PubSub Request 消息即使确认?

如何在 Firebase Cloud Functions 中确认 PubSub 消息?

GCP 云功能未正确接收/确认 PubSub 消息