Cloud Run PubSub 高延迟

Posted

技术标签:

【中文标题】Cloud Run PubSub 高延迟【英文标题】:Cloud Run PubSub high latency 【发布时间】:2021-12-29 17:23:55 【问题描述】:

我正在构建一个微服务应用程序,其中包含许多使用 Node.js 构建并在 Cloud Run 上运行的微服务。我以几种不同的方式使用 PubSub:

    用于每天的流式数据。负责从不同广告服务(Facebook Ads、LinkedIn Ads 等)收集分析数据的微服务使用 PubSub 将数据流式传输到负责将数据上传到 Google BigQuery 的微服务。还有一些服务通过将数据拆分成更小的数据块,从 CRM 和其他服务流式传输更高负载的数据(> 1 Gb)。 用于微服务之间关于不需要立即响应的不同事件的消息传递。

之前,我在使用 PubSub 时遇到了一些微不足道的延迟。我知道它是an open issue,考虑到长达几秒的延迟和低消息吞吐量。但就我而言,我们谈论的是几分钟的延迟。

另外,我偶尔会收到一条错误消息

发布时收到错误:API google.pubsub.v1.Publisher 在收到任何响应之前的总超时时间超过 60000 毫秒。

在这种情况下,消息根本没有发送或被高度延迟。

这就是我的代码的样子。

const subscriptions = new Map<string, Subscription>();
const topics = new Map<string, Topic>();

const listenForMessages = async (
  subscriptionName: string,
  func: ListenerCallback,
  secInit = 300,
  secInter = 300
) => 
  let logger = new TestLogger("LISTEN_FOR_MSG");
  let init = true;
  const _setTimeout = () => 
    let timer = setTimeout(() => 
      console.log(`Subscription to $subscriptionName cancelled`);
      subscription.removeListener("message", messageHandler);
    , (init ? secInit : secInter) * 1000);
    init = false;
    return timer;
  ;

  const messageHandler = async (msg: Message) => 
    msg.ack();
    await func(JSON.parse(msg.data.toString()));
    // wait for next message
    timeout = _setTimeout();
  ;

  let subscription: Subscription;

  if (subscriptions.has(subscriptionName)) 
    subscription = subscriptions.get(subscriptionName);
   else 
    subscription = pubSubClient.subscription(subscriptionName);
    subscriptions.set(subscriptionName, subscription);
  

  let timeout = _setTimeout();

  subscription.on("message", messageHandler);
  console.log(`Listening for messages: $subscriptionName`);
;

const publishMessage = async (
  data: WithAnyProps,
  topicName: string,
  options?: PubOpt
) => 
  const serializedData = JSON.stringify(data);
  const dataBuffer = Buffer.from(serializedData);
  try 
    let topic: Topic;
    if (topics.has(topicName)) 
      topic = topics.get(topicName);
     else 
      topic = pubSubClient.topic(topicName, 
        batching: 
          maxMessages: options?.batchingMaxMessages,
          maxMilliseconds: options?.batchingMaxMilliseconds,
        ,
      );
      topics.set(topicName, topic);
    
    let msg = 
      data: dataBuffer,
      attributes: options.attributes,
    ;

    await topic.publishMessage(msg);
    console.log(`Publishing to $topicName`);
   catch (err) 
    console.error(`Received error while publishing: $err.message`);
  
;

listenerForMessage 函数由 HTTP 请求触发。

我已经检查过的内容

    PubSub 客户端只在函数外创建一次。 主题和订阅被重复使用。 我让每个容器至少运行一个实例,以消除冷启动引发延迟的可能性。 我尝试增加容器的 CPU 和内存容量。 batchingMaxMessagesbatchingMaxMilliseconds 设置为 1 我检查了最新版本的@google-cloud/pubsub 是否已安装。

备注

    高延迟问题仅发生在云环境中。通过本地测试,一切正常。 两种环境有时都会出现超时错误。

【问题讨论】:

您能分享一下您是如何运行代码的吗?您的网络服务器和整体逻辑。 @guillaumeblaquiere,我不确定我是否完全了解您的问题,但我会尽力为您提供相关信息。所有代码都在完全托管的 Cloud Run 容器上的 Node.js 中运行。我有一个服务,我们称之为 PipelineService,包含有关现有数据管道的信息:源和目标类型、连接 ID、管道状态(活动/暂停)、上次数据传输日期等。我也有多个服务,让我们比如 FacebookService 等,包含有关源连接、访问令牌等的信息。 @guillaumeblaquiere,还有一个 GBQService 持有 google 访问令牌和目标表的地址。 Cloud Scheduler 每天调用一次 PipelineService。反过来,PipelineService 唤醒源和目标服务并激活 PubSub 侦听器。他们开始执行一些预热操作并监听 PubSub 消息,其中包含有关要检索的数据的时间范围和地址的说明。检索到数据后,通过消息将其发送到 GBQService,GBQService 再将数据上传到 Google BigQuery。 @guillaumeblaquiere,如果在任何给定点进程失败并且无法自动恢复,则有关失败原因的消息将发送回 PipelineService 并保存到数据库中。否则,发送成功确认。每个服务在没有新消息的特定时间后删除一个侦听器并关闭。 好的,请原谅我不够精确:我想知道调用消息发布部分的代码是什么。使用 Cloud Run,您必须拥有一个网络服务器。应在其上调用端点,并在该端点中调用您的发布函数。我想了解这个调用链。 【参考方案1】:

问题在于我对 Cloud Run Container 生命周期的理解。我曾经在 PubSub 在后台工作时发送 HTTP 响应 202。发送响应后,容器切换到空闲状态,在我的日志中看起来像高延迟。

【讨论】:

以上是关于Cloud Run PubSub 高延迟的主要内容,如果未能解决你的问题,请参考以下文章

在 PubSub 主题上使用 Cloud Run

从 Memorystore Redis PubSub 消息触发 Cloud Run API

如何使用 Google Cloud PubSub 和 Run 处理资源密集型长时间运行的任务?

GCP Pubsub 低消息/秒的高延迟

如何使用 pub sub 从 Global EventArc 触发 Cloud Run

排空或清除 Google Cloud pubsub 主题的最佳做法 [关闭]