卡夫卡:如何在卡夫卡实现循环分区

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了卡夫卡:如何在卡夫卡实现循环分区相关的知识,希望对你有一定的参考价值。

我是卡夫卡的新人。我的要求是,我有两个分区,例如Partition-0和Partition-1,我有一个值列表,它们也包含KEY值。我想根据我的密钥存储数据,如key-1将转到Partition-0,key-2将转到Partition-1。使用旧的API,可以实现像我们需要实现分区接口,但我如何使用新API。谢谢

答案

使用新的生产者,您还可以实现Partitioner接口(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java)来实现循环分发。

您可以使用DefaultPartitioner作为参考 - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java

另一答案

如果你想要循环行为,只要在写入Producer时不传递密钥,DefaultPartitioner将为你完成工作。您不需要编写自定义实现。来自javadocs:

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
另一答案

您可以通过覆盖卡夫卡制作人的default partitioner以循环方式制作卡夫卡。

一个伪实现

class RRPartitioner():
      def __init__():
            # Using topic metadata get total number of partitions
            self.total_partitions = client[topic].get_number_partitions()
            self.part_offset = 0

      def partitioner(self, key, msg):
          if self.part_offset > self.total_partitions:
              self.part_offset = 0
              return self.part_offset
          else:
              self.part_offset += 1
              return self.part_offset

以上实现它是纯粹的循环,如果你想根据密钥订购消息并且循环,你将需要在自定义分区器中做更多。

以上是关于卡夫卡:如何在卡夫卡实现循环分区的主要内容,如果未能解决你的问题,请参考以下文章

卡夫卡消费者:受控阅读主题

卡夫卡多个生产者写同一主题?

kafka 分区和副本以及kafaka 执行流程,以及消息的高可用

卡夫卡长轮询

雪花卡夫卡连接器限制?

卡夫卡流与卡夫卡消费者如何决定使用啥