Kafka3.x核心速查手册二客户端使用篇-4消息路由机制

Posted roykingw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka3.x核心速查手册二客户端使用篇-4消息路由机制相关的知识,希望对你有一定的参考价值。

​ 了解前面两个机制后,你自然会想到一个问题。就是消息如何进行路由?也即是两个相关联的问题。

  • Producer会根据消息的key选择Partition,具体如何通过key找Partition呢?
  • 一个消费者组会共同消费一个Topic下的多个Partition中的同一套消息副本,那Consumer节点是不是可以决定自己消费哪些Partition的消息呢?

​ 这两个问题其实都不难,你只要在几个Config类中稍微找一找就能找到答案。

首先,在Producer中,可以指定一个Partitioner来对消息进行分配。

	public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
    private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" +
        "<ul>" +
            "<li><code>org.apache.kafka.clients.producer.internals.DefaultPartitioner</code>: The default partitioner. " +
        "This strategy will try sticking to a partition until the batch is full, or <code>linger.ms</code> is up. It works with the strategy:" +
                "<ul>" +
                    "<li>If no partition is specified but a key is present, choose a partition based on a hash of the key</li>" +
                    "<li>If no partition or key is present, choose the sticky partition that changes when the batch is full, or <code>linger.ms</code> is up.</li>" +
                "</ul>" +
            "</li>" +
            "<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: This partitioning strategy is that " +
        "each record in a series of consecutive records will be sent to a different partition(no matter if the 'key' is provided or not), " +
        "until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " +
        "Please check KAFKA-9965 for more detail." +
            "</li>" +
            "<li><code>org.apache.kafka.clients.producer.UniformStickyPartitioner</code>: This partitioning strategy will " +
        "try sticking to a partition(no matter if the 'key' is provided or not) until the batch is full, or <code>linger.ms</code> is up." +
            "</li>" +
        "</ul>" +
        "<p>Implementing the <code>org.apache.kafka.clients.producer.Partitioner</code> interface allows you to plug in a custom partitioner.";

​ 这里就说明了Kafka是通过一个Partitioner接口的具体实现来决定一个消息如何根据Key分配到对应的Partition上的。你甚至可以很简单的实现一个自己的分配策略。

​ 消息发送者主要实现了两种分配策略,RoundRobinPartitioner是在各个Partition中进行轮询发送。DefaultPartitioner和UniformStickyPartitioner主要都是基于sticky策略,这种策略会尽量在Producer与Partition之间建议一种平均分配并且稳定的对应关系。

​ 比如说有两个同组的Producer,Producer1和Producer2,然后Topic下有五个Partition,partition0~4。那么UniformStickyPartitioner策略就会先将五个partition尽量平均的分配给Producer。比如Producer1对应Partiton0,Partition2,Partition4。Producer2对应Partition1,Partition3。那往后这一批消息中,Producer1和Producer2都会尽量固定的发往自己对应的Partition。

UniformStickyPartitioner是不按照Key进行分区的。DefaultPartitioner则会先按照Key分区。没有Key就按照Sticky分区。

然后,在Consumer中,可以指定一个PARTITION_ASSIGNMENT_STRATEGY分区分配策略,决定如何在多个Consumer实例和多个Partitioner之间建立关联关系。

 public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +
        "ordered by preference, of supported partition assignment strategies that the client will use to distribute " +
        "partition ownership amongst consumer instances when group management is used. Available options are:" +
        "<ul>" +
        "<li><code>org.apache.kafka.clients.consumer.RangeAssignor</code>: Assigns partitions on a per-topic basis.</li>" +
        "<li><code>org.apache.kafka.clients.consumer.RoundRobinAssignor</code>: Assigns partitions to consumers in a round-robin fashion.</li>" +
        "<li><code>org.apache.kafka.clients.consumer.StickyAssignor</code>: Guarantees an assignment that is " +
        "maximally balanced while preserving as many existing partition assignments as possible.</li>" +
        "<li><code>org.apache.kafka.clients.consumer.CooperativeStickyAssignor</code>: Follows the same StickyAssignor " +
        "logic, but allows for cooperative rebalancing.</li>" +
        "</ul>" +
        "<p>The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, " +
        "but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.</p>" +
        "<p>Implementing the <code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> " +
        "interface allows you to plug in a custom assignment strategy.</p>";

​ 同样,Kafka内置了一些实现方式,在通常情况下也都是最优的选择。你也可以实现自己的分配策略。

​ 从上面介绍可以看到Kafka默认提供了三种消费者的分区分配策略

  • range策略: 比如一个Topic有10个Partiton(partition 0~9) 一个消费者组下有三个Consumer(consumer1~3)。Range策略就会将分区0~3分给一个Consumer,4~6给一个Consumer,7~9给一个Consumer。
  • round-robin策略:轮询分配策略,可以理解为在Consumer中一个一个轮流分配分区。比如0,3,6,9分区给一个Consuerm,1,4,7分区给一个Consuerm,然后2,5,8给一个Consumer
  • sticky策略:粘性策略。这个策略有两个原则:1、在开始分区时,尽量保持分区的分配均匀。比如按照Range策略分(这一步实际上是随机的)。2、分区的分配尽可能的与上一次分配的保持一致。比如在range分区的情况下,第三个Consumer的服务宕机了,那么按照sticky策略,就会保持consumer1和consumer2原有的分区分配情况。然后将consumer3分配的7~9分区尽量平均的分配到另外两个consumer上。这种粘性策略可以很好的保持Consumer的数据稳定性。

渔与鱼:实现分配策略本身并不难,但是更重要的是需要考虑分配算法的执行效率,尤其是在高并发,海量消息场景下的执行效率。官方默认提供的生产者端的DefaultPartitioner以及消费者端的RangeAssignor+CooperativeStickyAssignor分配策略,在大部分场景下都是非常高效的算法。深入理解这些算法,虽然对开发的帮助并不会很大,但是对于你深入理解MQ场景,以及借此去横向对比理解其他的MQ产品,都是非常有帮助的。

以上是关于Kafka3.x核心速查手册二客户端使用篇-4消息路由机制的主要内容,如果未能解决你的问题,请参考以下文章

Kafka3.x核心速查手册二客户端使用篇-4消息路由机制

Kafka3.x核心速查手册二客户端使用篇-3消息序列化机制

Kafka3.x核心速查手册二客户端使用篇-3消息序列化机制

Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务

Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务

Kafka3.x核心速查手册二客户端使用篇-7生产者消息事务