Cloud Run PubSub 高延迟
Posted
技术标签:
【中文标题】Cloud Run PubSub 高延迟【英文标题】:Cloud Run PubSub high latency 【发布时间】:2021-12-29 17:23:55 【问题描述】:我正在构建一个微服务应用程序,其中包含许多使用 Node.js 构建并在 Cloud Run 上运行的微服务。我以几种不同的方式使用 PubSub:
-
用于每天的流式数据。负责从不同广告服务(Facebook Ads、LinkedIn Ads 等)收集分析数据的微服务使用 PubSub 将数据流式传输到负责将数据上传到 Google BigQuery 的微服务。还有一些服务通过将数据拆分成更小的数据块,从 CRM 和其他服务流式传输更高负载的数据(> 1 Gb)。
用于微服务之间关于不需要立即响应的不同事件的消息传递。
之前,我在使用 PubSub 时遇到了一些微不足道的延迟。我知道它是an open issue,考虑到长达几秒的延迟和低消息吞吐量。但就我而言,我们谈论的是几分钟的延迟。
另外,我偶尔会收到一条错误消息
发布时收到错误:API google.pubsub.v1.Publisher 在收到任何响应之前的总超时时间超过 60000 毫秒。
在这种情况下,消息根本没有发送或被高度延迟。
这就是我的代码的样子。
const subscriptions = new Map<string, Subscription>();
const topics = new Map<string, Topic>();
const listenForMessages = async (
subscriptionName: string,
func: ListenerCallback,
secInit = 300,
secInter = 300
) =>
let logger = new TestLogger("LISTEN_FOR_MSG");
let init = true;
const _setTimeout = () =>
let timer = setTimeout(() =>
console.log(`Subscription to $subscriptionName cancelled`);
subscription.removeListener("message", messageHandler);
, (init ? secInit : secInter) * 1000);
init = false;
return timer;
;
const messageHandler = async (msg: Message) =>
msg.ack();
await func(JSON.parse(msg.data.toString()));
// wait for next message
timeout = _setTimeout();
;
let subscription: Subscription;
if (subscriptions.has(subscriptionName))
subscription = subscriptions.get(subscriptionName);
else
subscription = pubSubClient.subscription(subscriptionName);
subscriptions.set(subscriptionName, subscription);
let timeout = _setTimeout();
subscription.on("message", messageHandler);
console.log(`Listening for messages: $subscriptionName`);
;
const publishMessage = async (
data: WithAnyProps,
topicName: string,
options?: PubOpt
) =>
const serializedData = JSON.stringify(data);
const dataBuffer = Buffer.from(serializedData);
try
let topic: Topic;
if (topics.has(topicName))
topic = topics.get(topicName);
else
topic = pubSubClient.topic(topicName,
batching:
maxMessages: options?.batchingMaxMessages,
maxMilliseconds: options?.batchingMaxMilliseconds,
,
);
topics.set(topicName, topic);
let msg =
data: dataBuffer,
attributes: options.attributes,
;
await topic.publishMessage(msg);
console.log(`Publishing to $topicName`);
catch (err)
console.error(`Received error while publishing: $err.message`);
;
listenerForMessage 函数由 HTTP 请求触发。
我已经检查过的内容
-
PubSub 客户端只在函数外创建一次。
主题和订阅被重复使用。
我让每个容器至少运行一个实例,以消除冷启动引发延迟的可能性。
我尝试增加容器的 CPU 和内存容量。
batchingMaxMessages 和 batchingMaxMilliseconds 设置为 1
我检查了最新版本的@google-cloud/pubsub 是否已安装。
备注
-
高延迟问题仅发生在云环境中。通过本地测试,一切正常。
两种环境有时都会出现超时错误。
【问题讨论】:
您能分享一下您是如何运行代码的吗?您的网络服务器和整体逻辑。 @guillaumeblaquiere,我不确定我是否完全了解您的问题,但我会尽力为您提供相关信息。所有代码都在完全托管的 Cloud Run 容器上的 Node.js 中运行。我有一个服务,我们称之为 PipelineService,包含有关现有数据管道的信息:源和目标类型、连接 ID、管道状态(活动/暂停)、上次数据传输日期等。我也有多个服务,让我们比如 FacebookService 等,包含有关源连接、访问令牌等的信息。 @guillaumeblaquiere,还有一个 GBQService 持有 google 访问令牌和目标表的地址。 Cloud Scheduler 每天调用一次 PipelineService。反过来,PipelineService 唤醒源和目标服务并激活 PubSub 侦听器。他们开始执行一些预热操作并监听 PubSub 消息,其中包含有关要检索的数据的时间范围和地址的说明。检索到数据后,通过消息将其发送到 GBQService,GBQService 再将数据上传到 Google BigQuery。 @guillaumeblaquiere,如果在任何给定点进程失败并且无法自动恢复,则有关失败原因的消息将发送回 PipelineService 并保存到数据库中。否则,发送成功确认。每个服务在没有新消息的特定时间后删除一个侦听器并关闭。 好的,请原谅我不够精确:我想知道调用消息发布部分的代码是什么。使用 Cloud Run,您必须拥有一个网络服务器。应在其上调用端点,并在该端点中调用您的发布函数。我想了解这个调用链。 【参考方案1】:问题在于我对 Cloud Run Container 生命周期的理解。我曾经在 PubSub 在后台工作时发送 HTTP 响应 202。发送响应后,容器切换到空闲状态,在我的日志中看起来像高延迟。
【讨论】:
以上是关于Cloud Run PubSub 高延迟的主要内容,如果未能解决你的问题,请参考以下文章
从 Memorystore Redis PubSub 消息触发 Cloud Run API
如何使用 Google Cloud PubSub 和 Run 处理资源密集型长时间运行的任务?