如何通过转发到死信主题来限制 Google Pub/Sub 交付尝试?

Posted

技术标签:

【中文标题】如何通过转发到死信主题来限制 Google Pub/Sub 交付尝试?【英文标题】:How to limit Google Pub/Sub delivery attempts by forwarding to a dead-letter topic? 【发布时间】:2020-10-13 04:42:54 【问题描述】:

我正在尝试使用死信主题(参见https://cloud.google.com/pubsub/docs/dead-letter-topics)配置 Pub/Sub 订阅,以限制消息在收到 nack 时重新传递的次数。为此,我创建了以下示例程序:

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "os"
    "time"

    "cloud.google.com/go/pubsub"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

const (
    topicID           = "my-topic"
    deadLetterTopicID = "my-dead-letter-topic"
    subscriptionID    = "my-subscription"
)

var (
    pubsubEmulatorHost string
    projectID          string
)

func main() 
    flag.StringVar(&pubsubEmulatorHost, "pubsubEmulatorHost", "", "Pub/Sub emulator host (e.g. localhost:8085)")
    flag.StringVar(&projectID, "projectID", "my-project", "Google Project ID")
    flag.Parse()

    if pubsubEmulatorHost != "" 
        os.Setenv("PUBSUB_EMULATOR_HOST", pubsubEmulatorHost)
        defer os.Unsetenv("PUBSUB_EMULATOR_HOST")
    

    client, err := pubsub.NewClient(context.Background(), projectID)
    if err != nil 
        log.Fatalf("NewClient: %v", err)
    

    topic, err := client.CreateTopic(context.Background(), topicID)
    if err != nil 
        if status.Code(err) == codes.AlreadyExists 
            topic = client.Topic(topicID)
            log.Printf("Topic %s already exists", topicID)
         else 
            log.Fatalf("CreateTopic: %v", err)
        
    
    defer func() 
        topic.Stop()
        if err := topic.Delete(context.Background()); err != nil 
            log.Fatalf("Delete topic: %v", err)
        
    ()

    deadLetterTopic, err := client.CreateTopic(context.Background(), deadLetterTopicID)
    if err != nil 
        if status.Code(err) == codes.AlreadyExists 
            deadLetterTopic = client.Topic(deadLetterTopicID)
            log.Printf("Topic %s already exists", deadLetterTopicID)
         else 
            log.Fatalf("CreateTopic: %v", err)
        
    
    defer func() 
        deadLetterTopic.Stop()
        if err := deadLetterTopic.Delete(context.Background()); err != nil 
            log.Fatalf("Delete dead-letter topic: %v", err)
        
    ()

    sub, err := client.CreateSubscription(context.Background(), subscriptionID, pubsub.SubscriptionConfig
        Topic: topic,
        DeadLetterPolicy: &pubsub.DeadLetterPolicy
            DeadLetterTopic:     fmt.Sprintf("projects/%s/topics/%s", projectID, deadLetterTopicID),
            MaxDeliveryAttempts: 5,
        ,
    )
    if err != nil 
        log.Fatalf("CreateSubscription: %v", err)
    
    defer func() 
        if err := sub.Delete(context.Background()); err != nil 
            log.Fatalf("Delete subscription: %v", err)
        
    ()

    go func() 
        sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) 
            log.Printf("Got message %q upon delivery attempt %d", msg.Data, msg.DeliveryAttempt)
            msg.Nack()
        )
    ()

    result := topic.Publish(context.Background(), &pubsub.MessageData: []byte("Hello, world!"))
    messageID, err := result.Get(context.Background())
    if err != nil 
        log.Fatalf("Get message ID of publish call: %v", err)
    
    log.Printf("Published message with ID %s", messageID)
    time.Sleep(20 * time.Second)

脚本以两种模式运行,一种使用真正的 Pub/Sub 项目(此处称为 my-project),另一种通过设置 PUBSUB_EMULATOR_HOST 环境变量使用 GCloud Pub/Sub emulator。我希望,鉴于订阅的 DeadLetterPolicyMaxDeliveryAttempts 设置为 5,nack'd Pub/Sub 消息将被传递大约 5 次(文档表明这是尽最大努力)。但是,如果我在真正的 Pub/Sub 项目上运行脚本,我会得到以下输出:

> go run main.go
2020/06/22 23:59:37 Published message with ID 1294186248588871
2020/06/22 23:59:38 Got message "Hello, world!" upon delivery attempt 824637866440
2020/06/22 23:59:40 Got message "Hello, world!" upon delivery attempt 824634417896
2020/06/22 23:59:41 Got message "Hello, world!" upon delivery attempt 824634418592
2020/06/22 23:59:43 Got message "Hello, world!" upon delivery attempt 824637866928
2020/06/22 23:59:44 Got message "Hello, world!" upon delivery attempt 824638981864
2020/06/22 23:59:45 Got message "Hello, world!" upon delivery attempt 824640667960
2020/06/22 23:59:47 Got message "Hello, world!" upon delivery attempt 824634418712
2020/06/22 23:59:49 Got message "Hello, world!" upon delivery attempt 824638982160
2020/06/22 23:59:50 Got message "Hello, world!" upon delivery attempt 824640667760
2020/06/22 23:59:51 Got message "Hello, world!" upon delivery attempt 824634418000
2020/06/22 23:59:52 Got message "Hello, world!" upon delivery attempt 824633942168
2020/06/22 23:59:53 Got message "Hello, world!" upon delivery attempt 824633942712
2020/06/22 23:59:53 Got message "Hello, world!" upon delivery attempt 824640668296
2020/06/22 23:59:54 Got message "Hello, world!" upon delivery attempt 824637448352
2020/06/22 23:59:55 Got message "Hello, world!" upon delivery attempt 824633943336
2020/06/22 23:59:55 Got message "Hello, world!" upon delivery attempt 824633943448
2020/06/22 23:59:56 Got message "Hello, world!" upon delivery attempt 824633943560
2020/06/22 23:59:57 Got message "Hello, world!" upon delivery attempt 824638259688
2020/06/22 23:59:57 Got message "Hello, world!" upon delivery attempt 824637448752

换句话说,nack 的消息被传递了 19 次,与我预期的 5 次相去甚远。如果我使用 Pub/Sub 模拟器运行程序,我会发现交付尝试始终为 0:

> go run main.go --pubsubEmulatorHost=localhost:8085
2020/06/23 00:00:54 Published message with ID 4
2020/06/23 00:00:54 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:54 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:54 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:54 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:54 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:54 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:54 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:54 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:54 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:54 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:55 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:55 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:55 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:55 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:55 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:55 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:55 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:55 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:55 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:55 Got message "Hello, world!" upon delivery attempt 0
2020/06/23 00:00:56 Got message "Hello, world!" upon delivery attempt 0
...

为简洁起见,此处的输出被截断,但消息打印了大约 200 次(每秒 10 次,持续 20 秒),再次远高于我预期的 5 次。

DeadLetterPolicyMaxDeliveryAttempts 字段是否不应该限制 nack 消息的传递尝试次数?为什么DeliveryAttempt 字段是一个如此奇怪的整数序列,而不是每次简单地递增 1 的整数序列(参见https://pkg.go.dev/cloud.google.com/go/pubsub?tab=doc#Message)?

【问题讨论】:

一般情况下,当您没有授予所需的权限时(here 在 Dead Letter Queue Options 下),因此 PubSub 可以发布给您的死者信主题或订阅您的订阅。另外,我必须指出,如果写入死信队列主题失败,PubSub 将继续将消息传递给您的订阅者。 您可以使用以下命令授予权限:首先:PUBSUB_SERVICE_ACCOUNT="service-$PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" 然后:gcloud pubsub topics add-iam-policy-binding <dead letter topic> \ --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\ --role='roles/pubsub.publisher' 最后:gcloud pubsub subscriptions add-iam-policy-binding <subscription with dead letter queue> \ --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\ --role='roles/pubsub.subscriber'。它对你有用吗? 正如 Alexandre 所提到的,您需要为 Cloud Pub/Sub 服务帐户提供正确的权限。此外,模拟器目前不支持死信主题。 【参考方案1】:

为了进一步为社区做出贡献,我根据我在评论部分中提到的内容发布此答案。

您描述的问题通常发生在您未提供所需权限(死信队列选项下的here)时,因此 PubSub 可以发布到您的死信主题或订阅您的订阅。另外,我必须指出,如果写入死信队列主题失败,PubSub 将继续将消息传递给您的订阅者。

为了给予必要的权限,你可以在你的shell环境中使用以下命令:

PUBSUB_SERVICE_ACCOUNT="service-$PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub topics add-iam-policy-binding <dead letter topic> \  --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\  --role='roles/pubsub.publisher'

gcloud pubsub subscriptions add-iam-policy-binding <subscription with dead letter queue> \  --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\  --role='roles/pubsub.subscriber'

另外,我想添加@Mahesh Gattani 评论,其中提到模拟器目前不支持死信主题。

【讨论】:

以上是关于如何通过转发到死信主题来限制 Google Pub/Sub 交付尝试?的主要内容,如果未能解决你的问题,请参考以下文章

Google Cloud Pub/Sub - 捕获发送到死信主题的消息的消息传递失败原因[关闭]

gcp 云函数 pub/sub 主题死信

使用 Cloud Functions、Pub/Sub 和死信队列/主题时,我的方法是不是正确?

Google Pub/Sub - 多个订阅同一主题以通过 REST API 减少负载?

Google Pubsub - 接收推送订阅的传递尝试

如何使用 Java 将错误消息移动到 Azure 死信队列(主题 - 订阅)?