谷歌云 pubsub node.js 客户端与谷歌云功能不兼容

Posted

技术标签:

【中文标题】谷歌云 pubsub node.js 客户端与谷歌云功能不兼容【英文标题】:google cloud pubsub node.js client not compatible with google cloud functions 【发布时间】:2018-07-15 09:26:02 【问题描述】:

架构:

我们有一个使用 2 个发布订阅主题/订阅对的架构:

主题T1 由cronjob 定期触发(例如每5 分钟一次)。订阅S1 是我们云功能的触发器。 主题T2 用作我们的一项服务发布的后台作业的队列。订阅S2 由云函数在每次执行时读取,以服务排队的后台作业。

这使我们能够控制后台作业的服务频率,而与它们何时添加到队列无关。

云函数(由S1 触发)通过pulling 读取来自S2 的消息。它决定哪些后台作业已准备就绪,并在成功为作业提供服务后,它会确认相关的消息。未准备好或失败的作业不会被确认以供以后提供服务。

问题:

我们在使用来自 google 的官方 node.js pubusb client 时遇到问题:

    有时 ACK 的消息会重新出现(似乎是无限的)。我们验证了消息在 ACK 截止日期之前得到确认,并通过调查我们的日志确定我们正在致电 ack()。 有时在第一次执行后(重新部署函数后),后续执行永远不会收到新消息。我们可以通过验证堆栈驱动程序中未确认的消息计数或通过重新部署函数并查看消息得到服务来验证消息是否在订阅S2 中排队。

我们认为这是 google 的 node.js pubsub 客户端的问题。云功能文档明确指出not start background activities。但是,查看 node.js pubsub 客户端源代码,它显然使用超时在后台服务确认。

google的node.js pubsub客户端不兼容google cloud的功能吗?谷歌recommends accessing the service API's only when a client library does not exist or does not meet other needs。在云功能中运行客户端是否“其他需求”,需要我们使用服务 API 编写自己的客户端?

尝试的解决方法:

作为“解决方法”,我们尝试延迟执行 cloudfunction 的结束,以允许 node.js pubsub 客户端中的任何“后台”进程完成,但这并没有始终消除我们的问题。 pubsub 客户端似乎对云功能不友好,并且无法从云功能执行之间的停止中恢复。

2018 年 2 月 22 日更新

我写了an article on our blog,详细描述了为什么我们以这种方式使用 PubSub,以及我们如何解决 node.js pubsub 客户端与云功能不兼容的事实。

【问题讨论】:

我做了一个类似的GitHub issue here 【参考方案1】:

我遇到了同样的问题,我想更好地控制.ack()。查看 google 的 nodejs 库,可以选择重构 ack() 以返回承诺,以便函数可以等待 ack() 完成。

Subscriber.prototype.ack_ = function(message) 
  var breakLease = this.breakLease_.bind(this, message);

  this.histogram.add(Date.now() - message.received);

  if (this.writeToStreams_ && this.isConnected_()) 
    this.acknowledge_(message.ackId, message.connectionId).then(breakLease);
    return;
  

  this.inventory_.ack.push(message.ackId);
  this.setFlushTimeout_().then(breakLease);
;

【讨论】:

我们更改了架构,以消除从云功能中从 pubsub 中提取消息的需要。我在答案中添加的链接说 Google 不支持它:'-( 最初的用例是使用 pubsub 作为队列,我们​​已将其替换为 firebase firestore 中的集合。它使我们可以更好地控制队列并与云功能很好地配合使用。【参考方案2】:

来自 node.js pubsub 客户端 confirmed 的开发人员表示不支持使用客户端从云函数中提取消息。

替代方法是使用service APIs。但是,当尝试从订阅中提取 所有 消息时,REST API 具有 their own caveats。

【讨论】:

【参考方案3】:

你是如何触发你的函数的?

根据docs,如果您的函数正在使用 pubsub 消息,那么您应该使用 pubsub 触发器。使用 pubsub 触发器时,不需要该库。只需在函数结束时调用callback(),pubsub 消息就会被正确确认。

对于您打算做的事情,我认为您当前的架构不是正确的选择。

我会使用cron task 将您的第一步移至 Google App Engine,并让此任务只需将消息从 T2 移动到 T1,让函数具有触发器 S2 并处理消息。

因此,您的工作将发布在 T2,并且您将拥有一个 GAE 应用程序,该应用程序具有由 cron 任务触发的拉订阅 S2,并且此应用程序会将消息重新发布到 T1。然后,您的函数将由订阅 S1 到主题 T1 触发,并在消息中运行作业,避免导入 pubsub 库的额外处理,并按预期使用产品。

此外,我不确定您最初是如何将您的工作发布到该主题的,但 Task Queues 是用于速率限制任务的一个很好的 GAE(和 product-agnostic in Alpha)选项。

仅用于此目的的 GAE 应用(设置 1 个最大实例)将位于 always free limit 内,因此成本不会显着增加。

【讨论】:

感谢您周到的回答!我们的架构似乎是独一无二的,但优点是我们将后台作业 (T2) 的发布与服务时 (T1) 分离。这使我们能够批量服务作业以节省资源。我们正在避免 GAE 不增加任何新的维护负担(将所有实现保留在 Kubernetes + 云功能中)。我们使用 Kubernetes CronJobs 发布到T1。最后,此解决方案将所有实现保留在 node.js 中。不是 python、go 或 java,它们似乎是 GAE 和任务队列的客户端。这是对 PubSub 的错误使用吗? 您可以直接在 k8s 中使用 Cloud Tasks 的 alpha 版本(您可以通过this link 申请加入)。它处于 alpha 阶段,所以也许不是 prod 中最棒的想法,但你可以研究一下。您也可以使用 cron 任务,但我仍然会发布到 T1 -> k8s 中的 cron 作业拉取并发布到 T2 -> GCF 与 T2 上的 pubsub 触发器 好的,目标是让 GCF 不必从 PubSub 中提取消息?这是因为您认为让 GCF 拉取 PubSub 消息会继续导致问题? 我认为 GCF 中可能有一种解决方法,方法是阻止回调直到消息被确认(我可以尝试运行一些测试)。我不认为它应该在未来导致(更多)问题,但我认为这种调节消息消耗率的方式是手动实现 Cloud Tasks 试图完成的任务的复杂方式。最好的方法可能是使用 Cloud Tasks(将您的作业作为任务直接排队),但请记住,它仍处于 alpha 阶段,并且始终存在发生重大更改的风险。 (话虽如此,GCF 也仍处于测试阶段,所以......) 我写了an article on our blog,更详细地描述了为什么我们要在云函数中提取发布订阅消息。它可能会为我的问题增加一些额外的清晰度。

以上是关于谷歌云 pubsub node.js 客户端与谷歌云功能不兼容的主要内容,如果未能解决你的问题,请参考以下文章

Neo4j 在谷歌云 pubsub 订阅中写入事务恐慌

谷歌云与Elastic和MongoDB等开源公司合作 挑战亚马逊

腾讯云与谷歌云达成合作?产品思维转变亦是王道!

将 github 存储库与谷歌云存储桶同步

Ticketek将MongoDB Atlas与谷歌云相结合, 助力分析应用

text 与谷歌云工具合作intellij。