如何使用 GCP 在 pubsub 模型中一次向所有订阅者发送消息

Posted

技术标签:

【中文标题】如何使用 GCP 在 pubsub 模型中一次向所有订阅者发送消息【英文标题】:How to send message to all subscribers at once in pubsub model using GCP 【发布时间】:2021-03-16 02:53:30 【问题描述】:

使用google云平台实现pubsub模型,使用函数创建topic、subscriber、publish和pullmsg函数。

func pullMsgs(projectID, subID string, jsonPath string) error 
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, projectID, option.WithCredentialsFile(jsonPath))
    if err != nil 
        return fmt.Errorf("pubsub.NewClient: %v", err)
    

    // Consume 10 messages.
    var mu sync.Mutex
    received := 0
    sub := client.Subscription(subID)
    cctx, cancel := context.WithCancel(ctx)
    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) 
        mu.Lock()
        defer mu.Unlock()
        // fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
        fmt.Println("Got message: n", string(msg.Data))
        msg.Ack()
        received++
        if received == 10 
            cancel()
        
    )
    if err != nil 
        return fmt.Errorf("Receive: %v", err)
    
    return nil

pullmsg 函数使用订阅 ID 从发布者获取发布的消息。 假设该模型有 3 个特定主题的订阅者。如果发布者为该主题发布消息。 pullmsg 函数必须执行 3 次才能为所有订阅者获取该消息。 有没有什么方法可以一次将发布的消息发送给所有订阅者。

【问题讨论】:

按照设计,每个订阅(以及每个订阅者)都会收到同一消息的不同副本。您能否澄清这 3 个订阅者是否处于同一进程中?如果是这样,是否有 1 个订阅者不够的原因? 【参考方案1】:

我不明白您为什么要一次从 3 个订阅中提取。因为这些消息可能不会在同一时间/大约同时到达。因此,如果您想合并这些消息,您的代码必须等到它收到所有消息。这并不是 Pub/Sub 的真正目的。另一方面,如果您确实想要合并这三个消息,我建议您为每个订阅创建单独的一段代码。

话虽如此,您可以使用 同步 拉取机制或 异步 拉取机制来拉取所有消息。如果您不希望您的代码被阻塞以便它基本上同时监听所有订阅,您可以使用异步拉取。

代码流可以是:

创建非阻塞订阅者函数。 当 x 条消息到达时让这个 pull 函数停止(中断一个 while 循环)。 创建三个传递您的三个主题 ID 的函数实例。 为您处理创建另一个函数。

更多信息可以在here找到。

【讨论】:

以上是关于如何使用 GCP 在 pubsub 模型中一次向所有订阅者发送消息的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 gsutil 向 GCP 存储桶添加 pubsub 通知

在 GCP 上使用 Pubsub 时如何解决身份验证范围不足的问题

如何从 GCP 函数 GUI 调用 PubSub 函数

GCP - 如何添加关于发送到 pubsub 死信队列的消息数量的警报?

如何在cakephp中一次发送多封电子邮件

如何将 GCP Pubsub 订阅的消息转发到另一个主题?