2、kafka如何选定分区数量

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2、kafka如何选定分区数量相关的知识,希望对你有一定的参考价值。

参考技术A 来自《kafka权威指南》第2章
num.partitions参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(该功能默认是启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果要让一个主题的分区个数少于 num.partitions 指定的值,需要手动创建该主题。
Kafka 集群通过分区对主题进行横向扩展,所以当有新的 broker 加入集群时,可以通过分区个数来实现集群的负载均衡。当然,这并不是说,在存在多个主题的情况下(它们分布在多个 broker 上),为了能让分区分布到所有 broker 上,主题分区的个数必须要大于 broker 的个数。不过,拥有大量消息的主题如果要进行负载分散,就需要大量的分区。

为主题选定分区数量并不是一件可有可无的事情,在进行数量选择时,需要考虑如下几个因素。

Quarkus Kafka Partitions 配置似乎只处理其中一个分区

【中文标题】Quarkus Kafka Partitions 配置似乎只处理其中一个分区【英文标题】:Quarkus Kafka Partitions config only seem to process one of the partitions 【发布时间】:2021-09-01 21:24:58 【问题描述】:

我正在尝试增加消费者的数量以匹配我们正在读取的 Kafka 主题的分区数量。有三个分区,所以我将传入消息的分区配置为三个,如下所示:

mp:
  messaging:
    incoming:
      topicA:
        auto:
          offset:
            reset: earliest
        topics: TOPIC-NAME
        connector: smallrye-kafka
        value:
          deserializer: org.apache.kafka.common.serialization.StringDeserializer
        group:
          id: consumer-group
        partitions: 3

但是,我已经运行该应用一段时间了,该应用似乎只处理分区 0 中的消息,而不处理分区 1 和 2 中的消息。我在日志中看到它创建了三个消费者。

2021-06-16 23:35:59,826 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=kafka-consumer-topicA-0, groupId=consumer-group] Successfully joined group with generation 15
2021-06-16 23:35:59,826 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-2) [Consumer clientId=kafka-consumer-topicA-2, groupId=consumer-group] Successfully joined group with generation 15
2021-06-16 23:35:59,826 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-1) [Consumer clientId=kafka-consumer-topicA-1, groupId=consumer-group] Successfully joined group with generation 15
2021-06-16 23:35:59,831 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-1) [Consumer clientId=kafka-consumer-topicA-1, groupId=consumer-group] Adding newly assigned partitions: TOPIC-NAME-1
2021-06-16 23:35:59,831 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=kafka-consumer-topicA-0, groupId=consumer-group] Adding newly assigned partitions: TOPIC-NAME-0
2021-06-16 23:35:59,831 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-2) [Consumer clientId=kafka-consumer-topicA-2, groupId=consumer-group] Adding newly assigned partitions: TOPIC-NAME-2

但是好像是在partition 0处理消息:

2021-06-16 23:38:00,141 INFO  [MessageListener] (vert.x-worker-thread-2) Partition number:0; offset: 1593011
2021-06-16 23:38:00,282 INFO  [MessageListener] (vert.x-worker-thread-1) Partition number:0; offset: 1593012
2021-06-16 23:38:00,412 INFO  [MessageListener] (vert.x-worker-thread-4) Partition number:0; offset: 1593013
2021-06-16 23:38:00,543 INFO  [MessageListener] (vert.x-worker-thread-6) Partition number:0; offset: 1593014
2021-06-16 23:38:00,692 INFO  [MessageListener] (vert.x-worker-thread-8) Partition number:0; offset: 1593015
2021-06-16 23:38:00,838 INFO  [MessageListener] (vert.x-worker-thread-10) Partition number:0; offset: 1593016
2021-06-16 23:38:00,977 INFO  [MessageListener] (vert.x-worker-thread-12) Partition number:0; offset: 1593017
2021-06-16 23:38:01,131 INFO  [MessageListener] (vert.x-worker-thread-14) Partition number:0; offset: 1593018
2021-06-16 23:38:01,272 INFO  [MessageListener] (vert.x-worker-thread-16) Partition number:0; offset: 1593019
2021-06-16 23:38:01,406 INFO  [MessageListener] (vert.x-worker-thread-18) Partition number:0; offset: 1593020
2021-06-16 23:38:01,535 INFO  [MessageListener] (vert.x-worker-thread-0) Partition number:0; offset: 1593021
2021-06-16 23:38:01,670 INFO  [MessageListener] (vert.x-worker-thread-3) Partition number:0; offset: 1593022
2021-06-16 23:38:01,799 INFO  [MessageListener] (vert.x-worker-thread-5) Partition number:0; offset: 1593023

监听类的代码sn-p如下:

    @Incoming("topicA")
    @Blocking
    public CompletionStage<Void> consume(final IncomingKafkaRecord<String, String> message) 

        log.info("Partition number:" + message.getPartition() + "; offset: " + message.getOffset());
    
        return message.ack();
    

这是小黑麦卡夫卡的错误吗?

【问题讨论】:

生产者是否对所有分区进行生产?可能它只写入分区 0。 是的。所有消息都在我正在阅读的主题的所有三个分区中。另一个应用正在向我正在消费的主题发布消息。 您需要创建一个唯一的消费者组 ID,这应该可以让您从所有分区中读取数据 如果我的理解是正确的,当将分区配置设置为 3 时,Small-Rye Reactive Messaging 库会在后台使用相同的组 ID 在单独的线程上创建三个 kafka 消费者。由于他们使用相同的组 id,消费者将从该主题读取每个分区。 似乎没有发生:) 【参考方案1】:

这些设置似乎将使用多个分区: 这将消耗所有消息。

mp.messaging.incoming.your-events.auto.offset.reset=earliest
mp.messaging.incoming.your-events.group.id=$quarkus.uuid

如果您使用的是发射器,则无需上述设置即可使用;

int partition = 0;
Message<Integer> message = Message.of(value)
            .addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                .withKey(key)
                .withPartition(partition) // change for each partition, 0, 1, 2..                
                .withTopic("your-events")
                .build());

【讨论】:

以上是关于2、kafka如何选定分区数量的主要内容,如果未能解决你的问题,请参考以下文章

kafka分区数量限制

kafka主题分区的数量和数据中不同键的数量

分区数量超过消费者时的 Apache Kafka 消息消费

kafka的分区和副本可以在一个服务器上么?

kafka专栏消费者组数据积压的查看与处理方法

如何为Kafka集群选择合适的Partitions数量