使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置

Posted

技术标签:

【中文标题】使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置【英文标题】:Concurrency settings for Spring Cloud Stream consumer with GCP pubsub 【发布时间】:2020-06-01 23:44:05 【问题描述】:

我的应用程序正在使用绑定到 GCP pubsub 的 Spring Cloud Stream 接收消息。我正在试验以下配置参数:

spring.cloud.gcp.pubsub.subscriber.executor-threads
spring.cloud.stream.bindings.<channelName>.consumer.concurrency

应用程序有 3 个不同的通道,每个通道都定义了一个消费者组。应用程序的多个实例将在生产环境中运行(在 kubernetes 上)。

我正在尝试找到要配置的正确设置,以确定每个应用实例中可以并行处理多少条消息。我一直在尝试在本地机器上调整上述两个参数,但似乎只有 executor-threads 有任何效果。如果我将其设置为 5,并将一堆消息泵入系统,我会在消息处理日志中看到 5 个线程。如果我将它增加到 10 个,我会在那里看到 10 个线程。然而,concurrency 参数似乎没有做任何事情,无论它设置为 1 还是 10 或其他什么。

如果有的话,这些参数之间的关系是什么?并发参数是否仅用于 Rabbit 或 Kafka 等其他 binder?

谢谢。

【问题讨论】:

【参考方案1】:
spring.cloud.gcp.pubsub.subscriber.executor-threads
spring.cloud.gcp.pubsub.subscriber.parallel-pull-count

Spring Cloud GCP 中的这两个配置选项与 Pub/Sub 的 Java client library 中的 setExecutorThreadCountsetParallelPullCount 相同。他们控制着行为 您的Pub/Sub 订阅客户。

parallel-pull-count 确定您的订阅者客户端打开多少个流来接收消息,executor-threads 确定您的订阅者客户端使用多少线程来处理消息回调(其中消息被确认,因此 Pub/Sub 服务不会将它们发送给您再次)。例如,当您将parallel-pull-count 设置为2 并将executor-threads 设置为4 时,消息将通过2 个流进入,8 个线程将忙于处理它们。请参阅Concurrency Control 了解更多信息。

spring.cloud.stream.bindings.<channelName>.consumer.concurrency

您是对的,此配置选项仅适用于某些活页夹。为了在你的消费者组中设置并发,你的生产者必须被分区。但是,Pub/Sub 的 Spring Cloud Stream 绑定器当前执行not support partitioning,因此您将无法设置并发。

我正在尝试找到正确的设置进行配置,以确定每个应用实例中可以并行处理多少条消息。

为了实现最大吞吐量,我将使用上面的前两个配置选项,同时还要记住,处理时间超过其确认截止日期的消息将过期,Pub/Sub 服务将重新传递它们。

【讨论】:

以上是关于使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置的主要内容,如果未能解决你的问题,请参考以下文章

GCP PubSub Spring Boot 重复提取消息

在 GCP PubSub 和 Spring Boot 中捕获发布错误

在 Spring Cloud GCP pubsub 中创建特定于消息通道的线程

Spring Boot GCP:将 PubSub 应用程序部署到 App Engine 标准环境时出现“Google 凭据”异常

如何使用 gsutil 向 GCP 存储桶添加 pubsub 通知

如何将 spring gcp PubSubTemplate 连接到本地实例?