如何消费来自多个主题的消息?
Posted
技术标签:
【中文标题】如何消费来自多个主题的消息?【英文标题】:How to consume messages from multiple topics? 【发布时间】:2019-06-17 15:27:13 【问题描述】:我正在尝试使用assign()
方法来使用来自多个主题的消息。通过我的实现,有时我可以使用来自所有主题的消息,而其他时候只能使用一个主题。经过一番研究,我发现 Kafka 默认使用 Range 分配器。因此它不会总是分配所有分区。
对于我的用例,我应该能够使用所有主题和分区。
我尝试过设置 RoundRobin 分配器。但这并没有帮助
List<TopicPartition> topicPartitions = new ArrayList<>();
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerConfig);
for (String topic : topics)
topicPartitions.add(new TopicPartition(topic, 0);
kafkaConsumer.assign(topicPartitions);
ConsumerRecords<String, String> records = kafkaConsumer.poll(600);`
【问题讨论】:
【参考方案1】:KafkaConsumer.assign 通常用于复杂的用例,您不仅要控制主题,还要控制您使用的分区。如果您只是想从多个主题(及其所有分区)中消费,您应该使用 KafkaConsumer.subscribe。
consumer.subscribe(Arrays.asList("topic1", "topic2"));
查看 javadoc javadoc,其中还显示了代码示例。
编辑:如果您需要控制分区分配,那么您确实需要使用 assign() 方法,但在您的(不完整的)代码示例中,您似乎分配了每个主题的分区 0 ;所以你只会消费来自分区 0 的消息。
如果您需要手动控制偏移量,您仍然可以使用订阅,但您可以禁用自动提交并使用 seek() 和 commitSync() 或 commitAsync() 来控制偏移量。
【讨论】:
我想根据主题+分区来控制分区和确认,因此使用assign()方法 subscribe() 方法不允许进行手动偏移控制。我必须使用assign()来实现手动偏移控制。【参考方案2】:以下内容应该可以为您解决问题:
consumer.subscribe(Arrays.asList("mytopic1","mytopic2"), ConsumerRebalanceListener obj)
// or consumer.subscribe(Arrays.asList(topic1,topic2), new ConsumerRebalanceListener ..)
ConsumerRecords<String, String> records = consumer.poll(600);
for (TopicPartition partition : records.partitions())
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords)
System.out.println(record.offset() + " -> " + record.value());
【讨论】:
我想通过确认来控制分区分配。这就是为什么我想使用 assign() 而不是 subscribe()以上是关于如何消费来自多个主题的消息?的主要内容,如果未能解决你的问题,请参考以下文章
即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]
如何增加Spring Kafka Consumer每批消费的消息数?