在 Spring Cloud GCP pubsub 中创建特定于消息通道的线程

Posted

技术标签:

【中文标题】在 Spring Cloud GCP pubsub 中创建特定于消息通道的线程【英文标题】:Make threads specific to a message channel in spring cloud GCP pubsub 【发布时间】:2021-07-23 23:30:57 【问题描述】:

我有一个运行 GCP PubSub 消息传递的 Spring Cloud 应用程序。我有 2 个消息入站频道订阅了 2 个不同的订阅者。我在应用程序的负载/压力测试期间面临的问题是,特定的线程数设置如下:

spring.cloud.gcp.pubsub.subscriber.executor-threads: 350
spring.cloud.gcp.pubsub.subscriber.parallel-pull-count: 2
spring.cloud.gcp.pubsub.subscriber.max-acknowledgement-threads: 700

当通道 1 的消息拉取的进程繁忙时,我没有足够的线程让通道 2 拉取消息。解决方案是限制/配置每个通道的线程数。我发现很难弄清楚这一点。请帮帮我!以下是我所指的频道:

@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel1")
public MessageHandler extractionMessageReceiver() 
    return message -> 
        // do something
    ;


@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel2")
public MessageHandler extractionMessageReceiver() 
    return message -> 
        // do something
    ;

注意,订阅者线程会一直忙到某个特定进程被消息拉取结束。

【问题讨论】:

我熟悉 Cloud PubSub,但不熟悉 Spring。似乎问题主要与 Spring Service Activator 节流有关。也许将问题更改为“如何通过 inputChannel 限制 ServiceActivator 激活?” 【参考方案1】:

我遇到了以下问题:当有很多消息并且它们排队时,pubsub 的执行器健康检查停止工作。我的假设是所有执行程序线程都忙于处理消息,并且检查运行到最后期​​限超出异常。 以下流控制属性帮助我解决了这个问题:

Config Description
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-element-count Maximum number of outstanding elements to keep in memory before enforcing flow control
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-request-bytes Maximum number of outstanding bytes to keep in memory before enforcing flow control.

来自https://docs.spring.io/spring-cloud-gcp/docs/1.1.0.M1/reference/html/_spring_cloud_gcp_for_pub_sub.html

当我将最大未完成元素数设置为 100 时,一切正常。

我认为未完成的消息是通过流来提取的。通过上面的属性,我们可以控制不是所有的消息都被一次处理。相反,我们将它们分成例如每条 100 条消息。也许它也会在频道之间切换。引用https://medium.com/google-cloud/things-i-wish-i-knew-about-google-cloud-pub-sub-part-2-b037f1f08318:

请注意,流式拉取只能确保尽最大努力进行流量控制。假设您注意到您的应用程序在任何一个时间段内只能处理 100 条消息,因此您将最大未完成消息设置为 100。客户端将在收到 100 条消息后暂停,这在大多数情况下都有效。但是,如果您随后在单个发布批次中发布 500 条消息,客户端将一次收到所有 500 条消息,但一次只能处理 100 条,这可能导致过期消息积压不断增加。这是因为流式拉取无法从单个发布批次中拆分消息。为避免这种情况,请在发布时增加订阅者数量或减少批量大小以匹配订阅者消息处理能力。

这些参数能解决你的问题吗?

【讨论】:

虽然此链接可能会回答问题,但最好在此处包含答案的基本部分并提供链接以供参考。如果链接页面发生更改,仅链接答案可能会失效。 - From Review 您的答案可以通过额外的支持信息得到改进。请edit 添加更多详细信息,例如引用或文档,以便其他人可以确认您的答案是正确的。你可以找到更多关于如何写好答案的信息in the help center。

以上是关于在 Spring Cloud GCP pubsub 中创建特定于消息通道的线程的主要内容,如果未能解决你的问题,请参考以下文章

使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置

spring cloud stream和gcp pub sub,binder问题

Go GCP Cloud PubSub 不批量发布消息

从 Cloud Function 发布到 GCP PubSub 的正确方法是啥?

GKE 上的 Google Cloud PubSub:尝试检索凭据时出现 FileNotFoundException

在 GCP PubSub 和 Spring Boot 中捕获发布错误