从Java API创建Kafka主题[重复]
Posted
技术标签:
【中文标题】从Java API创建Kafka主题[重复]【英文标题】:Kafka Topic creation from Java API [duplicate] 【发布时间】:2018-03-31 03:38:42 【问题描述】:我正在尝试使用 Java API 创建一个 Kafka 主题,但无法获得 LEADER。
代码:
int partition = 0;
ZkClient zkClient = null;
try
String zookeeperHosts = "localhost:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
String topicName = "mdmTopic5";
int noOfPartitions = 2;
int noOfReplication = 1;
Properties topicConfiguration = new Properties();
AdminUtils.createTopic(zkClient, topicName, noOfPartitions, noOfReplication, topicConfiguration);
catch (Exception ex)
ex.printStackTrace();
finally
if (zkClient != null)
zkClient.close();
错误:
[2017-10-19 12:14:42,263] WARN Error while fetching metadata with correlation id 1 : mdmTopic5=LEADER_NOT_AVAILABLE (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,370] WARN Error while fetching metadata with correlation id 3 : mdmTopic5=LEADER_NOT_AVAILABLE (org.apache.kafka.clients.NetworkClient)
[2017-10-19 12:14:42,479] WARN Error while fetching metadata with correlation id 4 : mdmTopic5=LEADER_NOT_AVAILABLE (org.apache.kafka.clients.NetworkClient)
Kafka 0.11.0.1 是否支持 AdminUtils.???请告诉我如何在这个版本中创建主题。
提前致谢。
【问题讨论】:
已经在这里回答了同样的问题:***.com/a/52209029/1545425 【参考方案1】:自 Kafka 0.11 以来,有一个适当的管理 API 用于创建(和删除)主题,我建议使用它而不是直接连接到 Zookeeper。
参见 AdminClient.createTopics():http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/admin/AdminClient.html#createTopics(java.util.Collection)
【讨论】:
【参考方案2】:通常LEADER NOT AVAILABLE
指向网络问题,而不是您的代码问题。
试试:
telnet host port
看看您是否可以从您的机器连接到所有必需的主机/端口。
但是,最新的方法是在创建主题时使用BOOTSTRAP_SERVERS
。
使用 Scala 的主题创建代码的工作版本如下:
使用 sbt 导入所需的kafka-clients
。
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += Seq("org.apache.kafka" % "kafka-clients" % "2.1.1")
scala中创建主题的代码:
import java.util.Arrays
import java.util.Properties
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.admin.AdminClient, AdminClientConfig
class CreateKafkaTopic
def create(): Unit =
val config = new Properties()
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.30.1.5:9092")
val localKafkaAdmin = AdminClient.create(config)
val partitions = 3
val replication = 1.toShort
val topic = new NewTopic("integration-02", partitions, replication)
val topics = Arrays.asList(topic)
val topicStatus = localKafkaAdmin.createTopics(topics).values()
//topicStatus.values()
println(topicStatus.keySet())
希望对你有帮助。
【讨论】:
以上是关于从Java API创建Kafka主题[重复]的主要内容,如果未能解决你的问题,请参考以下文章