spring-cloud-stream kafka 消费者并发
Posted
技术标签:
【中文标题】spring-cloud-stream kafka 消费者并发【英文标题】:spring-cloud-stream kafka consumer concurrency 【发布时间】:2016-06-21 15:28:03 【问题描述】:使用spring-cloud-stream的kafka binder,如何配置并发消息消费者(在单个消费者jvm中)?如果我理解正确,使用 kafka 时并发消息消费需要分区,但s-c-s docs 表示要使用分区,您需要通过 partitionKeyExpression 或 partitionKeyExtractorClass 在生产者中指定分区选择。 Kafka 文档提到了循环分区。
s-c-s 文档根本没有提到 spring.cloud.stream.bindings.*.concurrency,尽管这在我上面描述的用例中似乎很重要。使用生产者配置
spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/json
partitionCount: 3
和消费者配置
spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/x-java-object;type=foo.Customer
partitioned: true
concurrency: 3
我似乎得到了我想要的行为(至少在某种程度上)。我可以看到有时有 3 个消费者线程处于活动状态,尽管除了轮询之外似乎确实存在一些分区,因为有些消息似乎在等待繁忙的消费者线程并在该线程完成后被消耗。我认为这是因为消息被发送到同一个分区。
当我没有指定 partitionKeyExpression 或 partitionKeyExtractorClass 时,生产者是否使用了一些默认的密钥提取和分区策略?这是使用 kafka 设置 s-c-s 消费者的合适方法,您希望多个线程消费消息以增加消费者吞吐量?
【问题讨论】:
【参考方案1】:由于您的生产者没有分区(没有partitionKeyExpression
设置),生产者端将在 3 个分区上循环(如果这不是观察到的行为,请在 Git Hub 中打开票证)。如果你配置了partitionKeyExpression
,那么producer会根据配置的逻辑对数据进行有效的分区。
在消费者方面,我们确保线程/分区的亲和性,因为这是广受推崇的 Kafka 约定 - 我们确保给定分区上的消息按顺序处理 - 这可能解释了您观察到的行为。 如果将消息 A、B、C、D 发送到分区 0、1、2、0 - D 将不得不等到 A 被处理,即使有两个其他线程可用。
增加吞吐量的一个选择是过度分区(这是 Kafka 中相当典型的策略)。这将进一步分散消息,并增加消息发送到不同线程的机会。
如果您不关心排序,另一个增加吞吐量的选择是在下游异步处理消息:例如通过将输入通道桥接到 ExecutorChannel。
一般来说,partitioned
是指客户端接收分区数据的能力(Kafka 客户端始终是分区的,但此设置也适用于 Rabbit 和/或 Redis)。它与属性instanceIndex
和instanceCount
结合使用,以确保主题的分区在多个应用程序实例之间正确划分(另见http://docs.spring.io/spring-cloud-stream/docs/1.0.0.M4/reference/htmlsingle/index.html#_instance_index_and_instance_count)
【讨论】:
当消费者无法处理消息时,使用 kafka 桥接到 ExecutorChannel 会导致消息丢失吗?一旦侦听器容器将请求传递给执行器通道,单个消费者的当前 kafka 消息 ID 就会增加。 感谢您的澄清问题 - 是的,这是可能的。我们确实支持spring.cloud.stream.binder.kafka.default.autoCommitEnabled=false
属性来抑制自动提交,在这种情况下,向入站消息提供了Acknowledgment
标头,请参阅github.com/spring-projects/spring-integration-kafka/blob/master/… 和github.com/spring-projects/spring-integration-kafka/blob/master/…'。
即使在这种情况下,如果消息以相反的顺序处理,则后面的消息可能会确认较早消息的偏移量。
综上所述,为了提高吞吐量和确保可靠处理,建议增加分区数和消费者数,要么增加并发,要么使用多个应用程序实例。
要验证的一点是,实际上在该主题上创建了 3 个分区。如果不是,请提出问题。我们目前正在对物业管理系统进行一些修复,主要是为了简化配置过程,因此这可能是我们需要解决的问题。以上是关于spring-cloud-stream kafka 消费者并发的主要内容,如果未能解决你的问题,请参考以下文章
spring使用kafka的三种方式(listenercontainerstream)
spring使用kafka的三种方式(listenercontainerstream)
spring使用kafka的三种方式(listenercontainerstream)