Google Pub/Sub 的 RetryPolicy 中配置的指数退避如何工作?

Posted

技术标签:

【中文标题】Google Pub/Sub 的 RetryPolicy 中配置的指数退避如何工作?【英文标题】:How does the exponential backoff configured in Google Pub/Sub's RetryPolicy work? 【发布时间】:2020-11-19 17:14:11 【问题描述】:

最近发布的cloud.google.com/go/pubsub 库(在v1.5.0 中,参见https://github.com/googleapis/google-cloud-go/releases/tag/pubsub%2Fv1.5.0)支持新的RetryPolicy 服务器端功能。当前的文档 (https://godoc.org/cloud.google.com/go/pubsub#RetryPolicy) 为

我已阅读 Wikipedia 文章,虽然它描述了离散时间的指数退避,但我看不出这篇文章与 MinimumBackoffMaximumBackoff 参数的具体关系。有关这方面的指导,我参考了github.com/cenkalti/backoff、https://pkg.go.dev/github.com/cenkalti/backoff/v4?tab=doc#ExponentialBackOff 的文档。该库将ExponentialBackoff 定义为

type ExponentialBackOff struct 
    InitialInterval     time.Duration
    RandomizationFactor float64
    Multiplier          float64
    MaxInterval         time.Duration
    // After MaxElapsedTime the ExponentialBackOff returns Stop.
    // It never stops if MaxElapsedTime == 0.
    MaxElapsedTime time.Duration
    Stop           time.Duration
    Clock          Clock
    // contains filtered or unexported fields

每个随机区间的计算方式为

randomized interval =
    RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])

其中RetryInterval 是当前重试间隔,据我所知,它从InitialInterval 的值开始,并以MaxInterval 为上限。

我是否正确理解MinimumBackoffMaximumBackoff 对应于github.com/cenkalti/backoff 中的InitialIntervalMaxInterval?也就是说MinimumBackoff是初始等待时间,MaximumBackoff是重试之间允许的最大时间?

为了检验我的理论,我编写了以下简化程序:

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "os"
    "time"

    "cloud.google.com/go/pubsub"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

var (
    projectID                      string
    minimumBackoff, maximumBackoff time.Duration
)

const (
    topicName             = "test-topic"
    subName               = "test-subscription"
    defaultMinimumBackoff = 10 * time.Second
    defaultMaximumBackoff = 10 * time.Minute
)

func main() 
    flag.StringVar(&projectID, "projectID", "my-project", "Google Project ID")
    flag.DurationVar(&minimumBackoff, "minimumBackoff", 5*time.Second, "minimum backoff")
    flag.DurationVar(&maximumBackoff, "maximumBackoff", 60*time.Second, "maximum backoff")
    flag.Parse()
    log.Printf("Running with minumum backoff %v and maximum backoff %v...", minimumBackoff, maximumBackoff)

    retryPolicy := &pubsub.RetryPolicyMinimumBackoff: minimumBackoff, MaximumBackoff: maximumBackoff

    client, err := pubsub.NewClient(context.Background(), projectID)
    if err != nil 
        log.Fatalf("NewClient: %v", err)
    

    topic, err := client.CreateTopic(context.Background(), topicName)
    if err != nil 
        log.Fatalf("CreateTopic: %v", err)
    
    log.Printf("Created topic %q", topicName)
    defer func() 
        topic.Stop()
        if err := topic.Delete(context.Background()); err != nil 
            log.Fatalf("Delete topic: %v", err)
        
        log.Printf("Deleted topic %s", topicName)
    ()

    sub, err := client.CreateSubscription(context.Background(), subName, pubsub.SubscriptionConfig
        Topic:       topic,
        RetryPolicy: retryPolicy,
    )
    if err != nil 
        log.Fatalf("CreateSubscription: %v", err)
    
    log.Printf("Created subscription %q", subName)
    defer func() 
        if err := sub.Delete(context.Background()); err != nil 
            log.Fatalf("Delete subscription: %v", err)
        
        log.Printf("Deleted subscription %q", subName)
    ()

    go func() 
        sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) 
            log.Printf("Nacking message: %s", msg.Data)
            msg.Nack()
        )
    ()

    topic.Publish(context.Background(), &pubsub.MessageData: []byte("Hello, world!"))
    log.Println("Published message")
    time.Sleep(60 * time.Second)

如果我分别使用 5s 和 60s 的标志默认 MinimumBackoffMaximumBackoff 运行它,我会得到以下输出:

> go run main.go
2020/07/29 18:49:32 Running with minumum backoff 5s and maximum backoff 1m0s...
2020/07/29 18:49:33 Created topic "test-topic"
2020/07/29 18:49:34 Created subscription "test-subscription"
2020/07/29 18:49:34 Published message
2020/07/29 18:49:36 Nacking message: Hello, world!
2020/07/29 18:49:45 Nacking message: Hello, world!
2020/07/29 18:49:56 Nacking message: Hello, world!
2020/07/29 18:50:06 Nacking message: Hello, world!
2020/07/29 18:50:17 Nacking message: Hello, world!
2020/07/29 18:50:30 Nacking message: Hello, world!
2020/07/29 18:50:35 Deleted subscription "test-subscription"
2020/07/29 18:50:35 Deleted topic test-topic

而如果我分别使用 1s 和 2s 的 MinimumBackoffMaximumBackoff 运行它,我会得到

> go run main.go --minimumBackoff=1s --maximumBackoff=2s
2020/07/29 18:50:42 Running with minumum backoff 1s and maximum backoff 2s...
2020/07/29 18:51:11 Created topic "test-topic"
2020/07/29 18:51:12 Created subscription "test-subscription"
2020/07/29 18:51:12 Published message
2020/07/29 18:51:15 Nacking message: Hello, world!
2020/07/29 18:51:18 Nacking message: Hello, world!
2020/07/29 18:51:21 Nacking message: Hello, world!
2020/07/29 18:51:25 Nacking message: Hello, world!
2020/07/29 18:51:28 Nacking message: Hello, world!
2020/07/29 18:51:31 Nacking message: Hello, world!
2020/07/29 18:51:35 Nacking message: Hello, world!
2020/07/29 18:51:38 Nacking message: Hello, world!
2020/07/29 18:51:40 Nacking message: Hello, world!
2020/07/29 18:51:44 Nacking message: Hello, world!
2020/07/29 18:51:47 Nacking message: Hello, world!
2020/07/29 18:51:50 Nacking message: Hello, world!
2020/07/29 18:51:52 Nacking message: Hello, world!
2020/07/29 18:51:54 Nacking message: Hello, world!
2020/07/29 18:51:57 Nacking message: Hello, world!
2020/07/29 18:52:00 Nacking message: Hello, world!
2020/07/29 18:52:03 Nacking message: Hello, world!
2020/07/29 18:52:06 Nacking message: Hello, world!
2020/07/29 18:52:09 Nacking message: Hello, world!
2020/07/29 18:52:12 Nacking message: Hello, world!
2020/07/29 18:52:13 Deleted subscription "test-subscription"
2020/07/29 18:52:13 Deleted topic test-topic

在后一个示例中,nack 之间的时间似乎非常一致 ~3 秒,这大概代表了在 MaximumBackoff 的 2 秒内做到这一点的“最大努力”?我仍然不清楚的是是否有任何随机化,是否有乘数(从第一个例子来看,重试之间的时间似乎不是每次都加倍),以及是否有等价的的MaxElapsedTime 中没有更多的重试?

【问题讨论】:

【参考方案1】:

最小回退和最大回退的重试策略字段类似于上面示例中的 InitialInterval 和 MaxInterval。 Cloud Pub/Sub 使用您提到的类似公式来计算指数延迟。这也包括随机化。

在 MaxInterval 之外,每次后续重试都会增加 MaxInterval 延迟。如果您想在尝试一定次数后停止重试,我们建议使用Dead Letter Queues。

【讨论】:

酷,我想我的下一个问题是:RetryPolicy 的实现中的MultiplierRandomizationFactor 值是什么?在第一个示例中,nacks 之间的时间似乎增长非常缓慢,这表明 Multiplier 接近于 1。 这些是系统内部细节,会随着时间而改变。我们不推荐依赖他们的用户。

以上是关于Google Pub/Sub 的 RetryPolicy 中配置的指数退避如何工作?的主要内容,如果未能解决你的问题,请参考以下文章

访问 Google Cloud Storage 触发事件“Pub/Sub”?

将 BigQuery 表流式传输到 Google Pub/Sub

Google Pub/Sub Java 示例

带有推送消费者的 Google Pub/Sub

Eventbus 和 Google Pub/SUb 之间的区别

将 Angular 中的点击发送到 Google Pub/Sub