kafka 获取主题的分区数

Posted

技术标签:

【中文标题】kafka 获取主题的分区数【英文标题】:kafka get partition count for a topic 【发布时间】:2016-05-28 00:49:41 【问题描述】:

如何从代码中获取任何 kafka 主题的分区数。我研究了许多链接,但似乎都没有。

提几个:

http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api

http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic

http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic

看起来类似的讨论。

SO 上也有类似的链接,但没有有效的解决方案。

【问题讨论】:

哪个 Kafka 版本? vish4071,接受你最终使用的解决方案怎么样? 【参考方案1】:

转到您的kafka/bin 目录。

然后运行这个:

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name

您应该在PartitionCount 下看到您需要的内容。

Topic:topic_name        PartitionCount:5        ReplicationFactor:1     Configs:
        Topic: topic_name       Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: topic_name       Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001

当使用 Zookeeper 不再是 Kafka 依赖项的版本时

kafka-topics --describe --bootstrap-server localhost:9092 --topic topic_name

【讨论】:

赞成,因为我碰巧正在寻找一个非代码解决方案,这是完美的。 如果你想总结一个主题正则表达式的所有分区(包括副本),请参阅下面的答案。【参考方案2】:

在 0.82 Producer API 和 0.9 Consumer api 中,您可以使用类似

Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")

【讨论】:

这假设我使用 KafkaConsumer 作为我的消费者,但我使用的是 ConsumerConnector。参考:cwiki.apache.org/confluence/display/KAFKA/…【参考方案3】:

我是这样做的:

  /**
   * Retrieves list of all partitions IDs of the given @code topic.
   * 
   * @param topic
   * @param seedBrokers List of known brokers of a Kafka cluster
   * @return list of partitions or empty list if none found
   */
  public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) 
    for (BrokerInfo seed : seedBrokers) 
      SimpleConsumer consumer = null;
      try 
        consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
        List<String> topics = Collections.singletonList(topic);
        TopicMetadataRequest req = new TopicMetadataRequest(topics);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

        List<Integer> partitions = new ArrayList<>();
        // find our partition's metadata
        List<TopicMetadata> metaData = resp.topicsMetadata();
        for (TopicMetadata item : metaData) 
          for (PartitionMetadata part : item.partitionsMetadata()) 
            partitions.add(part.partitionId());
          
        
        return partitions;  // leave on first successful broker (every broker has this info)
       catch (Exception e) 
        // try all available brokers, so just report error and go to next one
        LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
       finally 
        if (consumer != null)
          consumer.close();
      
    
    throw new RuntimeError("Could not get partitions");
  

请注意,我只需要提取分区 ID,但您还可以检索任何其他分区元数据,例如 leaderisrreplicas、... 而BrokerInfo 只是一个简单的 POJO,它具有 hostport 字段。

【讨论】:

【参考方案4】:

shell cmd下面可以打印分区数。在执行 cmd 之前,您应该在 kafka bin 目录中:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk 'print $2' | uniq -c |awk 'NR==2print "count of partitions=" $1'

请注意,您必须根据需要更改主题名称。 您也可以使用 if 条件进一步验证这一点:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk 'print $2' | uniq -c |awk 'NR==2if ($1=="16") print "valid partitions"'

如果 count 为 16,上述 cmd 命令打印有效分区。您可以根据需要更改 count。

【讨论】:

【参考方案5】:

在 java 代码中,我们可以使用AdminClient 来获取一个主题的总和部分。

Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
AdminClient client = AdminClient.create(props);

DescribeTopicsResult result = client.describeTopics(Arrays.asList("TEST"));
Map<String, KafkaFuture<TopicDescription>>  values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get("TEST");
int partitions = topicDescription.get().partitions().size();
System.out.println(partitions);

【讨论】:

【参考方案6】:

使用来自 KafkaConsumer 的 PartitionList

     //create consumer then loop through topics
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    List<PartitionInfo> partitions = consumer.partitionsFor(topic);

    ArrayList<Integer> partitionList = new ArrayList<>();
    System.out.println(partitions.get(0).partition());

    for(int i = 0; i < partitions.size(); i++)
        partitionList.add(partitions.get(i).partition());
    

    Collections.sort(partitionList);

应该像魅力一样工作。如果有更简单的方法可以从主题访问分区列表,请告诉我。

【讨论】:

【参考方案7】:

因此,以下方法适用于 kafka 0.10,它不使用任何生产者或消费者 API。它使用了 kafka 中 scala API 中的一些类,例如 ZkConnection 和 ZkUtils。

    ZkConnection zkConnection = new ZkConnection(zkConnect);
    ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
    System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
         JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());

【讨论】:

【参考方案8】:

我遇到了同样的问题,我需要获取主题的分区。

在回答here的帮助下,我能够从Zookeeper那里得到信息。

这是我在 Scala 中的代码(但可以很容易地翻译成 Java)

import org.apache.zookeeper.ZooKeeper

def extractPartitionNumberForTopic(topicName: String, zookeeperQurom: String): Int = 
  val zk = new ZooKeeper(zookeeperQurom, 10000, null);
  val zkNodeName = s"/brokers/topics/$topicName/partitions"
  val numPartitions = zk.getChildren(zkNodeName, false).size
  zk.close()
  numPartitions

使用这种方法,我可以访问有关 Kafka 主题的信息以及有关 Kafka 代理的其他信息...

从 Zookeeper,您可以通过浏览到 /brokers/topics/MY_TOPIC_NAME/partitions

来检查主题的分区数

使用zookeeper-client.sh 连接到您的动物园管理员:

[zk: ZkServer:2181(CONNECTED) 5] ls /brokers/topics/MY_TOPIC_NAME/partitions
[0, 1, 2]

这告诉我们主题MY_TOPIC_NAME有3个分区

【讨论】:

【参考方案9】:
//create the kafka producer
def getKafkaProducer: KafkaProducer[String, String] = 
val kafkaProps: Properties = new Properties()
kafkaProps.put("bootstrap.servers", "localhost:9092")
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")

new KafkaProducer[String, String](kafkaProps)

val kafkaProducer = getKafkaProducer
val noOfPartition = kafkaProducer.partitionsFor("TopicName") 
println(noOfPartition) //it will print the number of partiton for the given 
//topic

【讨论】:

在较新的版本(kafka-python)中,KafkaConsumer.partitions_for_topic('TopicName') 返回一组所有分区 id。【参考方案10】:

@Sunil-patil 回答没有回答它的计数部分。你必须得到列表的大小

producer.partitionsFor("test").size()

@vish4071 没有必要反对 Sunil,你没有提到你在问题中使用 ConsumerConnector。

【讨论】:

【参考方案11】:

可以从zookeeper-shell中获取分区数

Syntax: ls /brokers/topics/<topic_name>/partitions

下面是例子:

root@zookeeper-01:/opt/kafka_2.11-2.0.0# bin/zookeeper-shell.sh zookeeper-01:2181
Connecting to zookeeper-01:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /brokers/topics/test/partitions
[0, 1, 2, 3, 4]

【讨论】:

【参考方案12】:

你可以像这样从 zookeeper 获取 kafka 分区列表。 它是真正的 kafka 服务器端分区号。

[zk: zk.kafka:2181(CONNECTED) 43] ls /users/test_account/test_kafka_name/brokers/topics/test_kafka_topic_name/partitions
[35, 36, 159, 33, 34, 158, 157, 39, 156, 37, 155, 38, 154, 152, 153, 150, 151, 43, 42, 41, 40, 202, 203, 204, 205, 200, 201, 22, 23, 169, 24, 25, 26, 166, 206, 165, 27, 207, 168, 208, 28, 29, 167, 209, 161, 3, 2, 162, 1, 163, 0, 164, 7, 30, 6, 32, 5, 160, 31, 4, 9, 8, 211, 212, 210, 215, 216, 213, 19, 214, 17, 179, 219, 18, 178, 177, 15, 217, 218, 16, 176, 13, 14, 11, 12, 21, 170, 20, 171, 174, 175, 172, 173, 220, 221, 222, 223, 224, 225, 226, 227, 188, 228, 187, 229, 189, 180, 10, 181, 182, 183, 184, 185, 186, 116, 117, 79, 114, 78, 77, 115, 112, 113, 110, 111, 118, 119, 82, 83, 80, 81, 86, 87, 84, 85, 67, 125, 66, 126, 69, 127, 128, 68, 121, 122, 123, 124, 129, 70, 71, 120, 72, 73, 74, 75, 76, 134, 135, 132, 133, 59, 138, 58, 57, 139, 136, 56, 137, 55, 64, 65, 62, 63, 60, 131, 130, 61, 49, 143, 48, 144, 145, 146, 45, 147, 44, 148, 47, 149, 46, 51, 52, 53, 54, 140, 142, 141, 50, 109, 108, 107, 106, 105, 104, 103, 99, 102, 101, 100, 98, 97, 96, 95, 94, 93, 92, 91, 90, 88, 89, 195, 194, 197, 196, 191, 190, 193, 192, 198, 199, 230, 239, 232, 231, 234, 233, 236, 235, 238, 237]

您可以在消费者代码中使用分区计数。

  def getNumPartitions(topic: String): Int = 
    val zk = CuratorFrameworkFactory.newClient(zkHostList, new RetryNTimes(5, 1000))

    zk.start()
    var numPartitions: Int = 0
    val topicPartitionsPath = zkPath + "/brokers/topics/" + topic + "/partitions"

    if (zk.checkExists().forPath(topicPartitionsPath) != null) 
        try 
            val brokerIdList = zk.getChildren().forPath(topicPartitionsPath).asScala
            numPartitions = brokerIdList.length.toInt
         catch 
            case e: Exception => 
                e.printStackTrace()
              
          
      
    zk.close()

    numPartitions
  

【讨论】:

【参考方案13】:

要获取分区列表,理想/实际的方法是使用 AdminClients API

    Properties properties=new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    AdminClient adminClient=KafkaAdminClient.create(properties);
    Map<String, TopicDescription> jension = adminClient.describeTopics(Collections.singletonList("jenison")).all().get();
    System.out.println(jension.get("jenison").partitions().size());

这可以作为独立的 java 方法运行,没有生产者/消费者依赖项。

【讨论】:

【参考方案14】:

您可以探索kafka.utils.ZkUtils,它有许多旨在帮助提取有关集群的元数据的方法。这里的答案很好,所以我只是为了多样性而添加:

import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient

def getTopicPartitionCount(zookeeperQuorum: String, topic: String): Int = 
  val client = new ZkClient(zookeeperQuorum)
  val partitionCount = ZkUtils.getAllPartitions(client)
    .count(topicPartitionPair => topicPartitionPair.topic == topic)

  client.close
  partitionCount

【讨论】:

【参考方案15】:
cluster.availablePartitionsForTopic(topicName).size()

【讨论】:

【参考方案16】:

我发现没有一个答案提供了一种快速简便的方法来计算给定主题正则表达式的所有分区。就我而言,我需要查看集群中有多少个分区,包括用于调整大小的副本。

以下是您可以运行的 bash 命令(无需额外工具):

kafka-topics --describe --bootstrap-server broker --topic ".*" | grep Configs | awk 'printf "%d\n", $4*$6' | awk 's+=$1 END print s'

您可以通过将.* 正则表达式替换为您喜欢的任何内容来调整主题正则表达式。还要确保将broker 更改为您的经纪人地址。

详情:

    Stream kafka-topics 描述给定感兴趣主题的输出 仅提取每个主题的第一行,其中包含分区计数和复制因子 PartitionCount 乘以 ReplicationFactor 得到主题的总分区数 汇总所有计数并打印总和

奖金:

如果您安装了 docker,则无需下载 Kafka 二进制文件:

docker run -it confluentinc/cp-kafka:6.0.0 /bin/bash

然后你可以运行它来访问所有的 Kafka 脚本:

cd /usr/bin

【讨论】:

【参考方案17】:

任何正在寻找 python confluent-kafka 包的人

from confluent_kafka.admin import AdminClient

topic_name = 'my_topic'
settings  = 'bootstrap.servers': ["..."]
kadmin = AdminClient(settings)
topic_metadata = kadmin.list_topics(topic_name).topics
     

【讨论】:

以上是关于kafka 获取主题的分区数的主要内容,如果未能解决你的问题,请参考以下文章

如何获取 kafka 主题分区的最后/结束偏移量?

获取主题和分区偏移量

2、kafka如何选定分区数量

Kafka主题中的分区数越多吞吐量就越高?BULLSHIT!!!

是否可以将分区添加到 Kafka 0.8.2 中的现有主题

一天内到达 Kafka 主题的消息数