[已解决]Pubsub 推送订阅不确认消息

Posted

技术标签:

【中文标题】[已解决]Pubsub 推送订阅不确认消息【英文标题】:Pubsub push subscription not acknowledging messages 【发布时间】:2021-03-05 02:55:27 【问题描述】:

这是我的设置。

订阅 A 是一种推送订阅,可将消息发布到云运行部署。

该部署公开了一个 HTTP 端点,处理消息,将结果发布到主题 B,并对订阅 A 的 POST 请求响应 200。整个过程大约需要 1.5 秒。

因此,对于订阅 A 中的每条消息,我应该在主题 B 中得到 1 条消息。 这就是我的代码的样子

我的应用启动了 Express 服务器

const express = require('express');
const bodyParser = require('body-parser');

const _ = require('lodash');

const startBrowser = require('./startBrowser');
const tab = require('./tab');
const createMessage = require('./publishMessage');
const domain = 'https://example.com';

require('dotenv').config();

const app = express();
app.use(bodyParser.json());


const port = process.env.PORT || 8080;
app.listen(port, async () => 
  console.log('Listening on port', port);
);

所有魔法发生的端点

app.post('/', async (req, res) => 

  // Define the success and fail functions, that respond status 200 and 500 respectively
  const failed = () => res.status(500).send();
  const completed = async () => 
    const response = await res.status(200).send();
    if (response && res.writableEnded) 
      console.log('successfully responded 200');
    
  ;

  //Process the data coming from Subscription A
  let pubsubMessage = decodeBase64Json(req.body.message.data);
  let parsed =  await processor(pubsubMessage);
  
  //Post the processed data to topic B
  let messageId = await postParsedData(parsed);


  if (messageId) 
  // ACK the message once the data has been processed and posted to topic B.
    completed();

   else 
    console.log('Didnt get a message id');
    // failed();
  
);

//define the functions that post data to Topic B

const postParsedData = async (parsed) => 
  if (!_.isEmpty(parsed)) 
    const topicName = 'topic-B';
    const messageIdInternal = await createMessage(parsed, topicName);
  ;


    return messageId;
   else 
    console.log('Parsed is Empty');
    return null;
  
;

function decodeBase64Json(data) 
  return JSON.parse(Buffer.from(data, 'base64').toString());

执行时间大约需要 ~1.5 秒,我可以看到记录在 Cloud 上的成功响应每 ~1.5 秒运行一次。总计约 2400 条消息/小时(每个云运行实例)。

主题 B 正在以约 2400 条消息/小时的速度接收新消息,订阅 A 的确认率为约 200 条消息/小时,这导致消息被多次重新传递。

订阅 A 的确认截止日期为 600 秒。 Cloud run 的请求超时时间为 300 秒。

我已经尝试在消息发布到主题 B 之前,甚至在解析之前确认消息,但我得到了相同的结果。

编辑:添加待处理消息和已处理消息的屏幕截图。处理的消息比 ACKed 未决消息多得多。应该是 1:1

感谢您的帮助

解决方案 GCP 支持无法重现此错误。大量 Cloud Run 虚拟机不会发生这种情况。解决办法就是增加worker实例的数量

【问题讨论】:

【参考方案1】:

你需要await你的complete();函数调用。像这样

....
 if (messageId) 
  // ACK the message once the data has been processed and posted to topic B.
    await completed();

   else 
    console.log('Didnt get a message id');
    // failed();
  

【讨论】:

感谢您的回复。我已经尝试过这种方法,但我仍然看到同样的问题。实际上,“completed()”不需要是一个异步函数(这是我为了找到问题而改变的),并且在 status(200).send() 之前使用 async 没有任何效果

以上是关于[已解决]Pubsub 推送订阅不确认消息的主要内容,如果未能解决你的问题,请参考以下文章

Google PubSub 排序密钥问题,未启用消息排序

Google Cloud PubSub 不确认消息

如何在 Firebase Cloud Functions 中确认 PubSub 消息?

PubSub 不确认消息

Google Pubsub - 接收推送订阅的传递尝试

在 PubSub 订阅确认截止日期和重新传递的上下文中,未发送给订阅者的消息是啥意思?