GCP Pub/sub:使用 goroutine 让多个订阅者在一个应用程序中运行

Posted

技术标签:

【中文标题】GCP Pub/sub:使用 goroutine 让多个订阅者在一个应用程序中运行【英文标题】:GCP Pub/sub: using goroutines to make multiple subscriber running in one application 【发布时间】:2020-10-19 10:44:09 【问题描述】:

我在接收来自 GCP Pub/Sub 的消息时发现了一个奇怪的行为。 以下代码是我使用pubsub client注册订阅的方法

gcp.go

package gcp

import (
    "context"
    "path"
    "runtime"

    "google.golang.org/api/option"

    "cloud.google.com/go/pubsub"
)

// PubsubClient is the GCP pubsub service client.
var PubsubClient *pubsub.Client

// Initialize initializes GCP client service using the environment.
func Initialize(env, projectName string) error 
    var err error
    ctx := context.Background()
    credentialOpt := option.WithCredentialsFile(getFilePathByEnv(env))

    PubsubClient, err = pubsub.NewClient(ctx, projectName, credentialOpt)
    return err


// GetTopic returns the specified topic in GCP pub/sub service and create it if it not exist.
func GetTopic(topicName string) (*pubsub.Topic, error) 
    topic := PubsubClient.Topic(topicName)
    ctx := context.Background()
    isTopicExist, err := topic.Exists(ctx)

    if err != nil 
        return topic, err
    

    if !isTopicExist 
        ctx = context.Background()
        topic, err = PubsubClient.CreateTopic(ctx, topicName)
    

    return topic, err


// GetSubscription returns the specified subscription in GCP pub/sub service and creates it if it not exist.
func GetSubscription(subName string, topic *pubsub.Topic) (*pubsub.Subscription, error) 
    sub := PubsubClient.Subscription(subName)
    ctx := context.Background()
    isSubExist, err := sub.Exists(ctx)

    if err != nil 
        return sub, err
    

    if !isSubExist 
        ctx = context.Background()
        sub, err = PubsubClient.CreateSubscription(ctx, subName, pubsub.SubscriptionConfigTopic: topic)
    

    return sub, err


func getFilePathByEnv(env string) string 
    _, filename, _, _ := runtime.Caller(1)

    switch env 
    case "local":
        return path.Join(path.Dir(filename), "local.json")
    case "development":
        return path.Join(path.Dir(filename), "development.json")
    case "staging":
        return path.Join(path.Dir(filename), "staging.json")
    case "production":
        return path.Join(path.Dir(filename), "production.json")
    default:
        return path.Join(path.Dir(filename), "local.json")
    

ma​​in.go

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "runtime"
    "runtime/debug"
    "runtime/pprof"
    "time"

    "rpriambudi/pubsub-receiver/gcp"

    "cloud.google.com/go/pubsub"
    "github.com/go-chi/chi"
)

func main() 
    log.Fatal(http.ListenAndServe(":4001", Route()))


func Route() *chi.Mux 
    InitializeSubscription()
    chiRoute := chi.NewRouter()

    chiRoute.Route("/api", func(r chi.Router) 
        r.Get("/_count", func(w http.ResponseWriter, r *http.Request) 
            fmt.Fprintf(w, "Number of goroutines: %v", runtime.NumGoroutine())
        )

        r.Get("/_stack", getStackTraceHandler)
    )

    return chiRoute


func InitializeSubscription() 
    gcp.Initialize("local", "fifth-bonbon-277102")

    go pubsubHandler("test-topic-1", "test-topic-1-subs")
    go pubsubHandler("test-topic-2", "test-topic-2-subs")
    go pubsubHandler("test-topic-3", "test-topic-3-subs")
    // ....

    return


func getStackTraceHandler(w http.ResponseWriter, r *http.Request) 
    stack := debug.Stack()
    w.Write(stack)

    pprof.Lookup("goroutine").WriteTo(w, 2)


func pubsubHandler(topicID string, subscriptionID string) 
    topic, err := gcp.GetTopic(topicID)
    fmt.Println("topic: ", topic)
    if err != nil 
        fmt.Println("Failed get topic: ", err)
        return
    

    sub, err := gcp.GetSubscription(subscriptionID, topic)
    fmt.Println("subscription: ", sub)
    if err != nil 
        fmt.Println("Get subscription err: ", err)
        return
    

    err = sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) 
        messageHandler(subscriptionID, ctx, msg)
    )
    if err != nil 
        fmt.Println("receive error: ", err)
    


func messageHandler(subscriptionID string, ctx context.Context, msg *pubsub.Message) 
    defer func() 
        if r := recover(); r != nil 
            fmt.Println("recovered from panic.")
            msg.Ack()
        
    ()

    fmt.Println("message of subscription: ", subscriptionID)
    fmt.Println("Message ID: ", string(msg.ID))
    fmt.Println("Message received: ", string(msg.Data))

    msg.Ack()
    time.Sleep(10 * time.Second)

当我在InitializeSubscription 中只有几个pubsubHandler 时,它的效果很好。但是当我在初始化函数(大约 10 个或更多处理程序)中添加更多 pubsubHandler 时,事情开始变得有趣。 ack 永远不会到达 pubsub 服务器,使得消息根本没有被确认(我已经在指标资源管理器中检查了AcknowledgeRequest,并且没有 ack 请求到来)。因此,消息不断返回给订阅者。另外,当我重新启动应用程序时,有时它不会收到任何消息,无论是新消息还是未确认消息。

我似乎找到了一种解决方法,方法是将NumGoroutines 设置为1pubsubHandler 函数中的每个订阅对象。

func pubsubHandler(topicID string, subscriptionID string) 
    ....

    sub, err := gcp.GetSubscription(subscriptionID, topic)
    
    ....

    sub.ReceiverSettings.NumGoroutines = 1
    err = sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) 
        messageHandler(subscriptionID, ctx, msg)
    )

    ....

我的问题是,这是一种预期的行为吗?可能导致这些意外行为的根本原因是什么?或者我的实现是完全错误的,以达到预期的结果? (一个应用程序内的多个订阅)。或者在创建订阅处理程序时有什么最佳实践可以遵循?

据我了解,pubsub.Subscription 中的 Receive 函数本身就是一个阻塞代码。因此,当我尝试在 goroutines 中运行它时,可能会导致意想不到的副作用,尤其是如果我们不限制可以处理消息的 goroutines 的数量。我的推理有效吗?

感谢您的回答,祝您有美好的一天!

编辑1:将示例更新为完整代码,因为之前没有直接在main.go中导入pubsub客户端。

【问题讨论】:

【参考方案1】:

我认为问题可能在于您处理消息的速度(目前每条消息 10 秒)。如果您一次收到太多消息,您的客户可能会不知所措,从而导致消息积压。

我建议尝试使用 flow control settings 并将 ReceiveSettings.NumGoroutines 增加到高于默认值 10。如果您的发布率很高,您还可以增加 MaxOutstandingMessages,或者通过将其设置为 -1 来完全禁用限制。这告诉客户端一次保留更多消息,这是每个 Receive 调用共享的限制。

【讨论】:

感谢您的回答。因为我在time.Sleep之前发送了ack,所以每条消息的处理率不应该小于10秒吗?而且,如果我增加 ReceiveSettings.NumGoroutines 是否意味着将处理 each 消息的 goroutine 的数量会增加,或者那是生成的将处理 any的 goroutine 的最大数量> 收到消息? 通常,您希望在acking 之前完成大部分处理。由于您的句柄函数中有time.Sleep,因此该消息仍将保留直到函数返回。增加 NumGoroutines 会增加生成拉取消息的 goroutine 的数量。如果你想要更多的 goroutine 来处理消息,增加MaxOutstandingMessages

以上是关于GCP Pub/sub:使用 goroutine 让多个订阅者在一个应用程序中运行的主要内容,如果未能解决你的问题,请参考以下文章

gcp 云函数 pub/sub 主题死信

如何将 blob 文件发布到 GCP Pub/Sub?

通过 goroutine 异步发布到 google pub sub

GCP Pub/Sub 无法确认消息

协议缓冲区架构无效。导入“google/protobuf/any.proto”尚未加载:GCP/Pub-Sub

Java 应用程序找不到访问 GCP Pub/Sub 的凭据