从Java API创建Kafka主题[重复]

Posted

技术标签:

【中文标题】从Java API创建Kafka主题[重复]【英文标题】:Kafka Topic creation from Java API [duplicate] 【发布时间】:2018-03-31 03:38:42 【问题描述】:

我正在尝试使用 Java API 创建一个 Kafka 主题,但无法获得 LEADER。

代码:

int partition = 0;
        ZkClient zkClient = null;
        try 
            String zookeeperHosts = "localhost:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
            int sessionTimeOutInMs = 15 * 1000; // 15 secs
            int connectionTimeOutInMs = 10 * 1000; // 10 secs

            zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);

            String topicName = "mdmTopic5";
            int noOfPartitions = 2;
            int noOfReplication = 1;
            Properties topicConfiguration = new Properties();
            AdminUtils.createTopic(zkClient, topicName, noOfPartitions, noOfReplication, topicConfiguration);

         catch (Exception ex) 
            ex.printStackTrace();
         finally 
            if (zkClient != null) 
                zkClient.close();
            
        

错误:

[2017-10-19 12:14:42,263] WARN Error while fetching metadata with correlation id 1 : mdmTopic5=LEADER_NOT_AVAILABLE (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,370] WARN Error while fetching metadata with correlation id 3 : mdmTopic5=LEADER_NOT_AVAILABLE (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,479] WARN Error while fetching metadata with correlation id 4 : mdmTopic5=LEADER_NOT_AVAILABLE (org.apache.kafka.clients.NetworkClient)

Kafka 0.11.0.1 是否支持 AdminUtils.???请告诉我如何在这个版本中创建主题。

提前致谢。

【问题讨论】:

已经在这里回答了同样的问题:***.com/a/52209029/1545425 【参考方案1】:

自 Kafka 0.11 以来,有一个适当的管理 API 用于创建(和删除)主题,我建议使用它而不是直接连接到 Zookeeper。

参见 AdminClient.createTopics():http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/admin/AdminClient.html#createTopics(java.util.Collection)

【讨论】:

【参考方案2】:

通常LEADER NOT AVAILABLE 指向网络问题,而不是您的代码问题。 试试:

telnet host port 看看您是否可以从您的机器连接到所有必需的主机/端口。

但是,最新的方法是在创建主题时使用BOOTSTRAP_SERVERS

使用 Scala 的主题创建代码的工作版本如下:

使用 sbt 导入所需的kafka-clients

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += Seq("org.apache.kafka" % "kafka-clients" % "2.1.1")

scala中创建主题的代码:

import java.util.Arrays
import java.util.Properties

import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.admin.AdminClient, AdminClientConfig

class CreateKafkaTopic 
  def create(): Unit = 
    val config = new Properties()
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.30.1.5:9092")

    val localKafkaAdmin = AdminClient.create(config)

    val partitions = 3
    val replication = 1.toShort
    val topic = new NewTopic("integration-02", partitions, replication)
    val topics = Arrays.asList(topic)

    val topicStatus = localKafkaAdmin.createTopics(topics).values()
    //topicStatus.values()
    println(topicStatus.keySet())
  


希望对你有帮助。

【讨论】:

以上是关于从Java API创建Kafka主题[重复]的主要内容,如果未能解决你的问题,请参考以下文章

我们如何使用 API 从 IDE 在 Kafka 中创建主题

如何检查kafka主题中的消息数量

无法从自动创建的 kafka 主题中获取消息

如何从kafka主题为ksqldb创建主题

kafka消费者动态检测添加的主题

如何从同一生产者向不同的 Kafka 主题和模式注册表生成消息