GCP PubSub - 使用 orderingKey (Nodejs) 进行批处理

Posted

技术标签:

【中文标题】GCP PubSub - 使用 orderingKey (Nodejs) 进行批处理【英文标题】:GCP PubSub - Batching with orderingKey (Nodejs) 【发布时间】:2021-09-05 19:48:52 【问题描述】:

使用 google PubSub 批处理和 orderingKey 时出现不稳定的行为。 如果我使用批量 PubSub 而不使用 orderingKey,代码将按预期工作:

        console.log( `Start: [$new Date().toISOString()]` );
    
        const array = [ ...Array( 5 ).keys() ];
        const topic = pubSubClient.topic( 'topic_test', 
            batching: 
                maxMessages: array.length,
                maxMilliseconds: 10 * 1000,
            ,
         );

        await Promise.all(
            array.map( async ( item ) => 
                const messageId = await topic.publish( Buffer.from( JSON.stringify( item ) ) );
                console.log( `[$new Date().toISOString()] Message $messageId published. id: $item` );
             )
        );

我无需等待即可立即获得结果:

Start: [2021-06-22T07:47:01.187Z]
[2021-06-22T07:47:01.187Z] Message 2578553779341346 published. id: 0
[2021-06-22T07:47:01.188Z] Message 2578553779341347 published. id: 1
[2021-06-22T07:47:01.188Z] Message 2578553779341348 published. id: 2
[2021-06-22T07:47:01.188Z] Message 2578553779341349 published. id: 3
[2021-06-22T07:47:01.188Z] Message 2578553779341350 published. id: 4

但如果我将 orderingKey 与批处理一起添加:

        console.log( `Start: [$new Date().toISOString()]` );
        
        const array = [ ...Array( 5 ).keys() ];
        const topic = pubSubClient.topic( 'topic_test', 
            enableMessageOrdering: true,
            batching: 
                maxMessages: array.length,
                maxMilliseconds: 10 * 1000,
            ,
         );

        await Promise.all(
            array.map( async ( item ) => 
                const messageId = await topic.publishMessage( 
                    data: Buffer.from( JSON.stringify( item ) ),
                    orderingKey: item,
                 );
                console.log( `[$new Date().toISOString()] Message $messageId published. id: $item` );
             )
        );

批次仅在 10 秒后发送(以 maxMilliseconds 为单位),即使之前已满足 maxMessages 条件:

Start: [2021-06-22T07:54:31.287Z]
[2021-06-22T07:54:41.995Z] Message 2578621698696149 published. id: 1
[2021-06-22T07:54:41.995Z] Message 2578554178769363 published. id: 4
[2021-06-22T07:54:41.996Z] Message 2578622069514679 published. id: 3
[2021-06-22T07:54:41.996Z] Message 2578621502622110 published. id: 0
[2021-06-22T07:54:42.058Z] Message 2578621667693212 published. id: 2

我该如何解决?

【问题讨论】:

【参考方案1】:

使用排序键时,按排序键进行批处理。因此,如果每条消息都有一个唯一的排序键,就像您的情况一样,您将始终等待maxMilliseconds。通常,排序键应该有一些重叠,例如,它应该是用户 ID 或数据库中一行的唯一键。使用整个 item 作为排序键是出乎意料的。

【讨论】:

您好,我仅在示例中使用整个商品作为订购键。感谢您的回复。

以上是关于GCP PubSub - 使用 orderingKey (Nodejs) 进行批处理的主要内容,如果未能解决你的问题,请参考以下文章

从现有的 GCP pubsub 订阅中消费

如何使用 GCP 在 pubsub 模型中一次向所有订阅者发送消息

GCP PubSub - 使用 orderingKey (Nodejs) 进行批处理

在 GCP 上使用 Pubsub 时如何解决身份验证范围不足的问题

使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置

GCP - 从 PubSub 到 BigQuery 的消息