使用 spring kafka 中的注释为每个主题单独的 Kafka 侦听器

Posted

技术标签:

【中文标题】使用 spring kafka 中的注释为每个主题单独的 Kafka 侦听器【英文标题】:Separate Kafka listener for each topic using annotations in spring kafka 【发布时间】:2020-03-11 10:05:45 【问题描述】:

我的应用程序正在监听多个 kafka 主题。现在,我的监听器如下所示,属性文件中的my.topics 包含逗号分隔的 kafka 主题列表

@KafkaListener(topics = ["#'\$my.topics'.split(',')"], groupId = "my.group", containerFactory = "myKafkaFactory")
fun genericMessageListener(myRequest: MyRequest, ack: Acknowledgment) 
   //do Something with myRequest
    ack.acknowledge()

我的ConcurrentKafkaListenerContainerFactory

@Bean
fun myKafkaFactory(): ConcurrentKafkaListenerContainerFactory<String, MyRequest> 
    val factory = ConcurrentKafkaListenerContainerFactory<String, MyRequest>()
    factory.consumerFactory = DefaultKafkaConsumerFactory(configProps(), StringDeserializer(), MyRequestDeserializer())
    factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
    return factory

有没有一种方法可以为每个主题动态创建一个单独的消费者,这样当我在my.topics 的列表中添加一个主题时,spring 会自动为该主题创建一个单独的消费者。

我现在面临的问题是,如果任何主题中的一条消息出现问题,其他主题中的消息也会受到影响。

在高层次上,我正在寻找类似的东西

@Bean
fun myKafkaFactory(): ConcurrentKafkaListenerContainerFactory<String, MyRequest> 
    val factory = ConcurrentKafkaListenerContainerFactory<String, MyRequest>()
    factory.consumerFactory = DefaultKafkaConsumerFactory(configProps(), StringDeserializer(), MyRequestDeserializer())
    factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
    factory.isOneConsumerPerTopic(true)
    return factory

这样factory.isOneConsumerPerTopic(true) 将确保为数组中的每个主题创建一个单独的消费者。

我确实通过了How to create separate Kafka listener for each topic dynamically in springboot?。我正在寻找更“清洁”的解决方案。 :)

【问题讨论】:

【参考方案1】:

您可以将自定义 PartitionAssignor 添加到 Kafka 消费者配置中。

将容器并发设置为(至少)主题数量,并让您的分配者将每个主题的分区分配给特定的消费者。

/**
 * This interface is used to define custom partition assignment for use in
 * @link org.apache.kafka.clients.consumer.KafkaConsumer. Members of the consumer group subscribe
 * to the topics they are interested in and forward their subscriptions to a Kafka broker serving
 * as the group coordinator. The coordinator selects one member to perform the group assignment and
 * propagates the subscriptions of all members to it. Then @link #assign(Cluster, Map) is called
 * to perform the assignment and the results are forwarded back to each respective members
 *
 * In some cases, it is useful to forward additional metadata to the assignor in order to make
 * assignment decisions. For this, you can override @link #subscription(Set) and provide custom
 * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
 * can use this user data to forward the rackId belonging to each member.
 */
public interface PartitionAssignor 

你可以从AbstractPartitionAssignor开始。

请参阅spring-kafka documentation。

在监听多个主题时,默认的分区分布可能不是你所期望的。例如,...

【讨论】:

以上是关于使用 spring kafka 中的注释为每个主题单独的 Kafka 侦听器的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Stream Kafka 消费者模式

每个应用程序的 Apache Kafka 主题

Kafka Kstream 和 Spring @KafkaListener 有何不同?

kafka 消息的 Thrift 序列化 - 每个结构的单个主题

java——spring boot集成kafka——broker主题分区副本——概念理解

Spark:并行处理多个kafka主题