批处理 PubSub 请求

Posted

技术标签:

【中文标题】批处理 PubSub 请求【英文标题】:Batching PubSub requests 【发布时间】:2018-08-10 18:27:51 【问题描述】:

批处理 pubsub 请求的 NODEJS 示例代码如下所示:

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'your-topic';
// const data = JSON.stringify( foo: 'bar' );
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

pubsub
  .topic(topicName)
  .publisher(
    batching: 
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    ,
  )
  .publish(dataBuffer)
  .then(results => 
    const messageId = results[0];
    console.log(`Message $messageId published.`);
  )
  .catch(err => 
    console.error('ERROR:', err);
  );

对我来说,不清楚如何使用此示例同时发布多条消息。有人能解释一下如何调整这段代码,以便它可以同时发布多条消息吗?

【问题讨论】:

确实 - 批处理示例忘记发布多个消息..? :) 【参考方案1】:

如果您想批量发送消息,那么您需要保留发布者并多次调用publish。例如,您可以将代码更改为以下内容:

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();


const topicName = 'my-topic';
const maxMessages = 10;
const maxWaitTime = 10000;
const data1 = JSON.stringify( foo: 'bar1' );
const data2 = JSON.stringify( foo: 'bar2' );
const data3 = JSON.stringify( foo: 'bar3' );

const publisher = pubsub.topic(topicName).publisher(
    batching: 
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    ,
  )

function handleResult(p) 
  p.then(results => 
    console.log(`Message $results published.`);
  )
  .catch(err => 
    console.error('ERROR:', err);
  );


// Publish three messages
handleResult(publisher.publish(Buffer.from(data1)));
handleResult(publisher.publish(Buffer.from(data2)));
handleResult(publisher.publish(Buffer.from(data3)));

消息的批处理由maxMessagesmaxMilliseconds 属性处理。前者表示批处理中包含的最大消息数。后者表示等待发布批次的最大毫秒数。这些属性在更大的批次(可能更有效)与发布延迟之间进行权衡。如果您要快速发布许多消息,那么maxMilliseconds 属性不会有太大影响;一旦有 10 条消息准备就绪,客户端库就会向 Cloud Pub/Sub 服务发出发布请求。但是,如果发布是零星的或缓慢的,那么可能会在十条消息之前发送一批消息。

在上面的示例代码中,我们在三个消息上调用publish。这不足以填满一批并发送。因此,在第一次调用 publish 后 10,000 毫秒,这三个消息将作为批处理发送到 Cloud Pub/Sub。

【讨论】:

感谢您的解释。现在我明白了。 发布方法什么时候返回?立即发送还是批量发送时? publish 将立即返回。只有在批量发送并由 Cloud Pub/Sub 确认后,发布方法返回的未来才会实现。 那么要等待结果,我们是否需要跟踪 期货? 您将需要跟踪发布调用返回的每个未来,这将不受 maxMessages 的限制。 maxMessages 指示何时将批处理发送到服务器。等待响应的服务器可能有多个未完成的批次。【参考方案2】:

批处理说明:

    如果要发布的消息达到maxMessages指定的数量,则忽略maxMilliseconds选项,立即批量发布等于maxMessages数量的消息;

    如果要发布的消息没有达到maxMessages指定的数量,等待maxMilliseconds时间后,批量发送这些消息

例如对于 1:

async function publishMessage(topicName) 
  console.log(`[$new Date().toISOString()] publishing messages`);
  const pubsub = new PubSub( projectId: PUBSUB_PROJECT_ID );
  const topic = pubsub.topic(topicName, 
    batching: 
      maxMessages: 10,
      maxMilliseconds: 10 * 1000,
    ,
  );

  const n = 12;
  const dataBufs: Buffer[] = [];
  for (let i = 0; i < n; i++) 
    const data = `message payload $i`;
    const dataBuffer = Buffer.from(data);
    dataBufs.push(dataBuffer);
  

  const results = await Promise.all(
    dataBufs.map((dataBuf, idx) =>
      topic.publish(dataBuf).then((messageId) => 
        console.log(`[$new Date().toISOString()] Message $messageId published. index: $idx`);
        return messageId;
      )
    )
  );
  console.log('results:', results.toString());

现在,我们将发布 12 条消息。执行结果:

[2020-05-05T09:09:41.847Z] publishing messages
[2020-05-05T09:09:41.955Z] Message 36832 published. index: 0
[2020-05-05T09:09:41.955Z] Message 36833 published. index: 1
[2020-05-05T09:09:41.955Z] Message 36834 published. index: 2
[2020-05-05T09:09:41.955Z] Message 36835 published. index: 3
[2020-05-05T09:09:41.955Z] Message 36836 published. index: 4
[2020-05-05T09:09:41.955Z] Message 36837 published. index: 5
[2020-05-05T09:09:41.955Z] Message 36838 published. index: 6
[2020-05-05T09:09:41.955Z] Message 36839 published. index: 7
[2020-05-05T09:09:41.955Z] Message 36840 published. index: 8
[2020-05-05T09:09:41.955Z] Message 36841 published. index: 9
[2020-05-05T09:09:51.939Z] Message 36842 published. index: 10
[2020-05-05T09:09:51.939Z] Message 36843 published. index: 11
results: 36832,36833,36834,36835,36836,36837,36838,36839,36840,36841,36842,36843

请注意时间戳。前 10 条消息将立即发布,因为它们是 maxMessages 指定的数量。然后,因为其余 2 条消息没有达到maxMessages 指定的数量。所以 pubsub 会等待 10 秒(maxMilliseconds),然后发送剩下的 2 条消息。

例如2:

async function publishMessage(topicName) 
  console.log(`[$new Date().toISOString()] publishing messages`);
  const pubsub = new PubSub( projectId: PUBSUB_PROJECT_ID );
  const topic = pubsub.topic(topicName, 
    batching: 
      maxMessages: 10,
      maxMilliseconds: 10 * 1000,
    ,
  );

  const n = 5;
  const dataBufs: Buffer[] = [];
  for (let i = 0; i < n; i++) 
    const data = `message payload $i`;
    const dataBuffer = Buffer.from(data);
    dataBufs.push(dataBuffer);
  

  const results = await Promise.all(
    dataBufs.map((dataBuf, idx) =>
      topic.publish(dataBuf).then((messageId) => 
        console.log(`[$new Date().toISOString()] Message $messageId published. index: $idx`);
        return messageId;
      )
    )
  );
  console.log('results:', results.toString());

现在,我们将发送 5 条消息,它们没有达到maxMessages 指定的数量。所以 pubsub 会等待 10 秒(maxMilliseconds)。等待 10 秒后(maxMilliseconds),pubsub 会批量发送这 5 条消息。此场景与第一个示例中剩余的 2 条消息相同。执行结果:

[2020-05-05T09:10:16.857Z] publishing messages
[2020-05-05T09:10:26.977Z] Message 36844 published. index: 0
[2020-05-05T09:10:26.977Z] Message 36845 published. index: 1
[2020-05-05T09:10:26.977Z] Message 36846 published. index: 2
[2020-05-05T09:10:26.977Z] Message 36847 published. index: 3
[2020-05-05T09:10:26.977Z] Message 36848 published. index: 4
results: 36844,36845,36846,36847,36848

【讨论】:

以上是关于批处理 PubSub 请求的主要内容,如果未能解决你的问题,请参考以下文章

Google PubSub / Gmail Webhook:发送电子邮件时始终从 PubSub 接收多个 POST 请求

通过http请求将Google pubsub作为JSON发布

监控和刷新 PubSub 批处理发布者队列

GCP PubSub - 使用 orderingKey (Nodejs) 进行批处理

“处理组件 pubsub 错误:组件 pubsub 的初始化超时超过 5 秒”

每个用户的 GCP PubSub(或 GCP 任务)同步处理