JS 中的 Gcloud pubsub worker

Posted

技术标签:

【中文标题】JS 中的 Gcloud pubsub worker【英文标题】:Gcloud pubsub worker in JS 【发布时间】:2019-09-05 14:45:09 【问题描述】:

我正在使用 setInterval 在 JS 中开发一个 worker,每 10 秒检查一次是否有新消息。我想知道以下代码的方法是否正确。你认为我会有一些性能问题吗?我在 Kubernetes 中将它作为单个 pod 运行。

const messageHandler = message => 
  sendRequest(message, (message, error, response, body) => 
    if (!error && response.statusCode == 200) 
      message.ack();
    
  );
;

subscription.on(`message`, messageHandler);

const timeout = 10;
setInterval(() => 
  subscription.removeListener('message', messageHandler);
  subscription.on(`message`, messageHandler);
, timeout * 1000);

【问题讨论】:

【参考方案1】:

我可能在这里遗漏了一些东西,但如果您希望您的订阅者“永远”启动并收听,那么我完全不明白您为什么需要删除您的听众。

那么为什么不这样呢:

const messageHandler = async message => 
    try 
        await doSomethingWith(message);
        message.ack();
     catch (err) 
        console.error(`Error handling message: $message.id: $err.message`);
        message.nack();
    
;


// Listen for new messages
subscription.on(`message`, messageHandler);

【讨论】:

【参考方案2】:

不是每 10 秒删除和注册你的 messageHandler 侦听器,以下是两种更好的方法:

    对传入消息有一个长时间运行的消息侦听器,直到自创建侦听器以来达到“x”时间。 为传入消息设置一个长时间运行的消息侦听器,并且仅在自收到最后一条消息后经过“x”时间后才关闭订阅者。

在消息延迟的极少数情况下,订阅者可能会在收到所有已发布的消息之前关闭。但是,如果您将超时设置为一个很大的数字(以分钟为单位),这不太可能发生。

有关创建订阅者客户端的示例,请参阅client library。您可以将超时修改为更大的数字。客户端库使用StreamingPull,它维护一个开放的双向流并在消息可用时接收消息,以实现最大吞吐量和低延迟。

【讨论】:

这两种方法的问题是我只收到传入的消息。我不会收到我没有用“ack”发送的消息。这就是我使用 setInterval 的原因。当我再次开始运行订阅者时,我会收到所有消息。 此外,我想让订阅者一直运行,以防收到新消息。 对于您未确认/确认的消息,它们将在ack-deadline 传递后重新发送给您的订阅者。您可以在创建订阅时配置初始确认截止日期(初始默认设置为 10 秒)。因此,如果您的订阅者仍在运行,它将接收重新传递的消息(您尚未确认的消息)。 客户端库通过增加用于重新传递的 ack-deadline 来自动延长消息的租约。可以通过maxExtension 下的FlowControlOptions 配置在重新交付之前允许延长最后期限的最长时间。 此外,为了防止订阅者不堪重负,您可以使用流量控制来限制订阅者接收消息的速率。

以上是关于JS 中的 Gcloud pubsub worker的主要内容,如果未能解决你的问题,请参考以下文章

Google Cloud PubSub - 似乎无法获取主题

使用 GCloud 模拟器的 Google Cloud PubSub V1

gcloud alpha pubsub 订阅寻求方法未找到异常

gcloud pubsub 订阅 pull 经常报空消息列表

PubSub 死字

谷歌云跟踪 + Gcloud 登录日志查看器