批处理 PubSub 请求
Posted
技术标签:
【中文标题】批处理 PubSub 请求【英文标题】:Batching PubSub requests 【发布时间】:2018-08-10 18:27:51 【问题描述】:批处理 pubsub 请求的 NODEJS 示例代码如下所示:
// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);
// Creates a client
const pubsub = new PubSub();
/**
* TODO(developer): Uncomment the following lines to run the sample.
*/
// const topicName = 'your-topic';
// const data = JSON.stringify( foo: 'bar' );
// const maxMessages = 10;
// const maxWaitTime = 10000;
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
pubsub
.topic(topicName)
.publisher(
batching:
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime,
,
)
.publish(dataBuffer)
.then(results =>
const messageId = results[0];
console.log(`Message $messageId published.`);
)
.catch(err =>
console.error('ERROR:', err);
);
对我来说,不清楚如何使用此示例同时发布多条消息。有人能解释一下如何调整这段代码,以便它可以同时发布多条消息吗?
【问题讨论】:
确实 - 批处理示例忘记发布多个消息..? :) 【参考方案1】:如果您想批量发送消息,那么您需要保留发布者并多次调用publish
。例如,您可以将代码更改为以下内容:
// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);
// Creates a client
const pubsub = new PubSub();
const topicName = 'my-topic';
const maxMessages = 10;
const maxWaitTime = 10000;
const data1 = JSON.stringify( foo: 'bar1' );
const data2 = JSON.stringify( foo: 'bar2' );
const data3 = JSON.stringify( foo: 'bar3' );
const publisher = pubsub.topic(topicName).publisher(
batching:
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime,
,
)
function handleResult(p)
p.then(results =>
console.log(`Message $results published.`);
)
.catch(err =>
console.error('ERROR:', err);
);
// Publish three messages
handleResult(publisher.publish(Buffer.from(data1)));
handleResult(publisher.publish(Buffer.from(data2)));
handleResult(publisher.publish(Buffer.from(data3)));
消息的批处理由maxMessages
和maxMilliseconds
属性处理。前者表示批处理中包含的最大消息数。后者表示等待发布批次的最大毫秒数。这些属性在更大的批次(可能更有效)与发布延迟之间进行权衡。如果您要快速发布许多消息,那么maxMilliseconds
属性不会有太大影响;一旦有 10 条消息准备就绪,客户端库就会向 Cloud Pub/Sub 服务发出发布请求。但是,如果发布是零星的或缓慢的,那么可能会在十条消息之前发送一批消息。
在上面的示例代码中,我们在三个消息上调用publish
。这不足以填满一批并发送。因此,在第一次调用 publish
后 10,000 毫秒,这三个消息将作为批处理发送到 Cloud Pub/Sub。
【讨论】:
感谢您的解释。现在我明白了。 发布方法什么时候返回?立即发送还是批量发送时? publish 将立即返回。只有在批量发送并由 Cloud Pub/Sub 确认后,发布方法返回的未来才会实现。 那么要等待结果,我们是否需要跟踪批处理说明:
如果要发布的消息达到maxMessages
指定的数量,则忽略maxMilliseconds
选项,立即批量发布等于maxMessages
数量的消息;
如果要发布的消息没有达到maxMessages
指定的数量,等待maxMilliseconds
时间后,批量发送这些消息
例如对于 1:
async function publishMessage(topicName)
console.log(`[$new Date().toISOString()] publishing messages`);
const pubsub = new PubSub( projectId: PUBSUB_PROJECT_ID );
const topic = pubsub.topic(topicName,
batching:
maxMessages: 10,
maxMilliseconds: 10 * 1000,
,
);
const n = 12;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++)
const data = `message payload $i`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
const results = await Promise.all(
dataBufs.map((dataBuf, idx) =>
topic.publish(dataBuf).then((messageId) =>
console.log(`[$new Date().toISOString()] Message $messageId published. index: $idx`);
return messageId;
)
)
);
console.log('results:', results.toString());
现在,我们将发布 12 条消息。执行结果:
[2020-05-05T09:09:41.847Z] publishing messages
[2020-05-05T09:09:41.955Z] Message 36832 published. index: 0
[2020-05-05T09:09:41.955Z] Message 36833 published. index: 1
[2020-05-05T09:09:41.955Z] Message 36834 published. index: 2
[2020-05-05T09:09:41.955Z] Message 36835 published. index: 3
[2020-05-05T09:09:41.955Z] Message 36836 published. index: 4
[2020-05-05T09:09:41.955Z] Message 36837 published. index: 5
[2020-05-05T09:09:41.955Z] Message 36838 published. index: 6
[2020-05-05T09:09:41.955Z] Message 36839 published. index: 7
[2020-05-05T09:09:41.955Z] Message 36840 published. index: 8
[2020-05-05T09:09:41.955Z] Message 36841 published. index: 9
[2020-05-05T09:09:51.939Z] Message 36842 published. index: 10
[2020-05-05T09:09:51.939Z] Message 36843 published. index: 11
results: 36832,36833,36834,36835,36836,36837,36838,36839,36840,36841,36842,36843
请注意时间戳。前 10 条消息将立即发布,因为它们是 maxMessages
指定的数量。然后,因为其余 2 条消息没有达到maxMessages
指定的数量。所以 pubsub 会等待 10 秒(maxMilliseconds
),然后发送剩下的 2 条消息。
例如2:
async function publishMessage(topicName)
console.log(`[$new Date().toISOString()] publishing messages`);
const pubsub = new PubSub( projectId: PUBSUB_PROJECT_ID );
const topic = pubsub.topic(topicName,
batching:
maxMessages: 10,
maxMilliseconds: 10 * 1000,
,
);
const n = 5;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++)
const data = `message payload $i`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
const results = await Promise.all(
dataBufs.map((dataBuf, idx) =>
topic.publish(dataBuf).then((messageId) =>
console.log(`[$new Date().toISOString()] Message $messageId published. index: $idx`);
return messageId;
)
)
);
console.log('results:', results.toString());
现在,我们将发送 5 条消息,它们没有达到maxMessages
指定的数量。所以 pubsub 会等待 10 秒(maxMilliseconds
)。等待 10 秒后(maxMilliseconds
),pubsub 会批量发送这 5 条消息。此场景与第一个示例中剩余的 2 条消息相同。执行结果:
[2020-05-05T09:10:16.857Z] publishing messages
[2020-05-05T09:10:26.977Z] Message 36844 published. index: 0
[2020-05-05T09:10:26.977Z] Message 36845 published. index: 1
[2020-05-05T09:10:26.977Z] Message 36846 published. index: 2
[2020-05-05T09:10:26.977Z] Message 36847 published. index: 3
[2020-05-05T09:10:26.977Z] Message 36848 published. index: 4
results: 36844,36845,36846,36847,36848
【讨论】:
以上是关于批处理 PubSub 请求的主要内容,如果未能解决你的问题,请参考以下文章
Google PubSub / Gmail Webhook:发送电子邮件时始终从 PubSub 接收多个 POST 请求
通过http请求将Google pubsub作为JSON发布
GCP PubSub - 使用 orderingKey (Nodejs) 进行批处理