kafka分区数量限制

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka分区数量限制相关的知识,希望对你有一定的参考价值。

参考技术A kafka分区过的弊端
kafka分区数量是理论是无限的。但是无限多的分区也是会影响kafka和kafka所在的服务器的性能 。

如何确定分区数量
可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)

使用压测工具,得出最佳分区数
kafka官方也提供了脚本方便我们针对我们的kafka集群做测试,我们可以测试当前提供的硬件条件进行压测,得出当前机器环境到底能支持多少分区数,从而达到尽量最优的方案。
生产者性能测试脚本:kafka-producer-perf-test.sh
消费者性能测试脚本:kafka-consumer-perf-test.sh

脚本的使用方法

如果一个受限制,如何在kafka中的剩余分区上应用循环法

我已经限制了一个主题的一个分区用于特定服务(因此所有请求都将在此处到达服务X)。对于任何其他服务请求将到达剩余的N个分区。

在java中我通过org.apache.kafka.clients.producer.Partitioner接口实现它。

@Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        String partitionKey = (String) key;

        if(Channel.DB.getValue().equalsIgnoreCase(partitionKey) && ( KafkaTopic.TRANS.getValue().equalsIgnoreCase(topic) || KafkaTopic.CONS.getValue().equalsIgnoreCase(topic) )){
            return 1; // this is reserved for SERVICE X only
        }

        return 0; // here i want to produce messages on remaining partitions, how to return partition now?
    }

问题:1:如何返回分区号。在这种情况下2:如何生成其他消息作为循环除了服务X的分区。

我正在使用Apache Kafka 9.0.1。

答案

下面的代码对我有用 - 这里的想法是,当密钥不是保留分区时,从可用分区列表中删除该特定分区并循环其余分区。

private final AtomicInteger counter = new AtomicInteger(0);

public static final int SPECIAL_PARTITION_ID = 1;

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();

    String partitionKey = (String) key;

    if ("SPECIAL_CUSTOMER".equals(partitionKey)) {
        LOGGER.info("PARTITION= " + SPECIAL_PARTITION_ID);
        return SPECIAL_PARTITION_ID; //special partition reserved for MY_SPECIAL_CUSTOMER
    } else {
        int nextValue = counter.getAndIncrement();

        List<PartitionInfo> availablePartitions = new ArrayList<>(cluster.availablePartitionsForTopic(topic));

        if (availablePartitions.size() > 0) {

            PartitionInfo specialPartition = null;

            for (PartitionInfo partitionInfo : availablePartitions) {
                if (partitionInfo.partition() == SPECIAL_PARTITION_ID) {
                    specialPartition = partitionInfo;
                    break;
                }
            }

            availablePartitions.remove(specialPartition);

            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            //optional -- depending upon your usecase
            while (true) {
                int p = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
                if(p != SPECIAL_PARTITION_ID) {
                    return p;
                }
            }
        }
    }
}

如果您只是确保一个密钥总是转到保留分区而其他密钥可能会进行循环,包括特殊分区,那么当密钥用于保留分区时,您可以通过传递partitionId轻松实现它,否则只需根本没有传递密钥,这可以节省你编写自定义分区程序。

此外,如果您不介意保留分区是最后一个分区而其余分区转到其他分区,则可能会有更简单的实现(摘自“Kafka:权威指南”一书)

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class BananaPartitioner implements Partitioner {

    public void configure(Map<String, ?> configs) {} 1

    public int partition(String topic, Object key, byte[] keyBytes,
                           Object value, byte[] valueBytes,
                             Cluster cluster) {
        List<PartitionInfo> partitions =
          cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if ((keyBytes == null) || (!(key instanceOf String))) 2
            throw new InvalidRecordException("We expect all messages
              to have customer name as key")

        if (((String) key).equals("Banana"))
            return numPartitions; // Banana will always go to last
                                     partition

        // Other records will get hashed to the rest of the
           partitions
        return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }

    public void close() {}
}

以上是关于kafka分区数量限制的主要内容,如果未能解决你的问题,请参考以下文章

Spark-sql读取hive分区表限制分区过滤条件及限制分区数量

硬盘容量是否受限制是

Kafka-分区分配规则

Cassandra 细胞数量限制

kafka的partiton限制拉取数据条数问题定位

kafka的partiton限制拉取数据条数问题定位