如何消费来自多个主题的消息?

Posted

技术标签:

【中文标题】如何消费来自多个主题的消息?【英文标题】:How to consume messages from multiple topics? 【发布时间】:2019-06-17 15:27:13 【问题描述】:

我正在尝试使用assign() 方法来使用来自多个主题的消息。通过我的实现,有时我可以使用来自所有主题的消息,而其他时候只能使用一个主题。经过一番研究,我发现 Kafka 默认使用 Range 分配器。因此它不会总是分配所有分区。

对于我的用例,我应该能够使用所有主题和分区。

我尝试过设置 RoundRobin 分配器。但这并没有帮助

List<TopicPartition> topicPartitions = new ArrayList<>();
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerConfig);
for (String topic : topics) 
   topicPartitions.add(new TopicPartition(topic, 0);

kafkaConsumer.assign(topicPartitions);
ConsumerRecords<String, String> records = kafkaConsumer.poll(600);`

【问题讨论】:

【参考方案1】:

KafkaConsumer.assign 通常用于复杂的用例,您不仅要控制主题,还要控制您使用的分区。如果您只是想从多个主题(及其所有分区)中消费,您应该使用 KafkaConsumer.subscribe。

consumer.subscribe(Arrays.asList("topic1", "topic2"));

查看 javadoc javadoc,其中还显示了代码示例。

编辑:如果您需要控制分区分配,那么您确实需要使用 assign() 方法,但在您的(不完整的)代码示例中,您似乎分配了每个主题的分区 0 ;所以你只会消费来自分区 0 的消息。

如果您需要手动控制偏移量,您仍然可以使用订阅,但您可以禁用自动提交并使用 seek() 和 commitSync() 或 commitAsync() 来控制偏移量。

【讨论】:

我想根据主题+分区来控制分区和确认,因此使用assign()方法 subscribe() 方法不允许进行手动偏移控制。我必须使用assign()来实现手动偏移控制。【参考方案2】:

以下内容应该可以为您解决问题:

consumer.subscribe(Arrays.asList("mytopic1","mytopic2"), ConsumerRebalanceListener obj)
// or consumer.subscribe(Arrays.asList(topic1,topic2), new ConsumerRebalanceListener ..)

ConsumerRecords<String, String> records = consumer.poll(600);
for (TopicPartition partition : records.partitions()) 
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) 
        System.out.println(record.offset() + " -> " + record.value());
    

【讨论】:

我想通过确认来控制分区分配。这就是为什么我想使用 assign() 而不是 subscribe()

以上是关于如何消费来自多个主题的消息?的主要内容,如果未能解决你的问题,请参考以下文章

即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]

如何增加Spring Kafka Consumer每批消费的消息数?

mq消息消费

如何使用Apache Kafka进行内容过滤?

源码分析RocketMQ消息消费机制----消费者拉取消息机制

Apache Kafka:如何找出主题的消费者组?