JS 中的 Gcloud pubsub worker
Posted
技术标签:
【中文标题】JS 中的 Gcloud pubsub worker【英文标题】:Gcloud pubsub worker in JS 【发布时间】:2019-09-05 14:45:09 【问题描述】:我正在使用 setInterval 在 JS 中开发一个 worker,每 10 秒检查一次是否有新消息。我想知道以下代码的方法是否正确。你认为我会有一些性能问题吗?我在 Kubernetes 中将它作为单个 pod 运行。
const messageHandler = message =>
sendRequest(message, (message, error, response, body) =>
if (!error && response.statusCode == 200)
message.ack();
);
;
subscription.on(`message`, messageHandler);
const timeout = 10;
setInterval(() =>
subscription.removeListener('message', messageHandler);
subscription.on(`message`, messageHandler);
, timeout * 1000);
【问题讨论】:
【参考方案1】:我可能在这里遗漏了一些东西,但如果您希望您的订阅者“永远”启动并收听,那么我完全不明白您为什么需要删除您的听众。
那么为什么不这样呢:
const messageHandler = async message =>
try
await doSomethingWith(message);
message.ack();
catch (err)
console.error(`Error handling message: $message.id: $err.message`);
message.nack();
;
// Listen for new messages
subscription.on(`message`, messageHandler);
【讨论】:
【参考方案2】:不是每 10 秒删除和注册你的 messageHandler 侦听器,以下是两种更好的方法:
-
对传入消息有一个长时间运行的消息侦听器,直到自创建侦听器以来达到“x”时间。
为传入消息设置一个长时间运行的消息侦听器,并且仅在自收到最后一条消息后经过“x”时间后才关闭订阅者。
在消息延迟的极少数情况下,订阅者可能会在收到所有已发布的消息之前关闭。但是,如果您将超时设置为一个很大的数字(以分钟为单位),这不太可能发生。
有关创建订阅者客户端的示例,请参阅client library。您可以将超时修改为更大的数字。客户端库使用StreamingPull,它维护一个开放的双向流并在消息可用时接收消息,以实现最大吞吐量和低延迟。
【讨论】:
这两种方法的问题是我只收到传入的消息。我不会收到我没有用“ack”发送的消息。这就是我使用 setInterval 的原因。当我再次开始运行订阅者时,我会收到所有消息。 此外,我想让订阅者一直运行,以防收到新消息。 对于您未确认/确认的消息,它们将在ack-deadline 传递后重新发送给您的订阅者。您可以在创建订阅时配置初始确认截止日期(初始默认设置为 10 秒)。因此,如果您的订阅者仍在运行,它将接收重新传递的消息(您尚未确认的消息)。 客户端库通过增加用于重新传递的 ack-deadline 来自动延长消息的租约。可以通过maxExtension 下的FlowControlOptions 配置在重新交付之前允许延长最后期限的最长时间。 此外,为了防止订阅者不堪重负,您可以使用流量控制来限制订阅者接收消息的速率。以上是关于JS 中的 Gcloud pubsub worker的主要内容,如果未能解决你的问题,请参考以下文章
Google Cloud PubSub - 似乎无法获取主题
使用 GCloud 模拟器的 Google Cloud PubSub V1
gcloud alpha pubsub 订阅寻求方法未找到异常