如何在kafka中创建topic

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在kafka中创建topic相关的知识,希望对你有一定的参考价值。

参考技术A

[Toc]

在使用kafka发送消息和消费消息之前,必须先要创建topic,在kafka中创建topic的方式有以下3种:

通过 kafka-topics.sh 脚本来创建一个名为 topic-test1 并且副本数为2、分区数为4的topic。(如无特殊说明,本文所述都是基于1.0.0版本。)

打开kafka-topics.sh脚本一探究竟,其内容只有一行,具体如下:

这个脚本的主要作用就是运行 kafka.admin.TopicCommand 。在main方法中判断参数列表中是否包含有 create ,如果有,那么就实施创建topic的任务。

创建topic时除了需要zookeeper的地址参数外,还需要指定topic的名称、副本因子replication-factor以及分区个数partitions等必选参数 ,还可以包括disable-rack-aware、config、if-not-exists等可选参数。

创建topic的时候,如果名称中包含 . 或者 _ ,kafka会抛出警告。原因是:

在Kafka的内部做埋点时会根据topic的名称来命名metrics的名称,并且会将句点号 . 改成下划线 _ 。假设遇到一个topic的名称为 topic.1_2 ,还有一个topic的名称为 topic_1.2 ,那么最后的metrics的名称都为 topic_1_2 ,所以就会发生名称冲突。

     topic的命名不推荐(虽然可以这样做)使用双下划线 __ 开头,因为以双下划线开头的topic一般看作是kafka的内部topic,比如 __consumer_offsets 和 __transaction_state 。

topic的名称必须满足如下规则:

我们如何使用 API 从 IDE 在 Kafka 中创建主题

【中文标题】我们如何使用 API 从 IDE 在 Kafka 中创建主题【英文标题】:How Can we create a topic in Kafka from the IDE using API 【发布时间】:2013-06-01 13:22:56 【问题描述】:

,因为当我这样做时:

bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181

我得到错误:

bash: bin/kafka-create-topic.sh: No such file or directory

我按照开发人员的设置进行操作。

【问题讨论】:

在运行命令之前,您应该位于 KAFKA_BASE_DIR(安装您的 kafka 的位置,例如 /var/kafka)内。 直接进入bin里面然后写sh kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181。或者 sh ./kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181 bin 前面的一个目录。 你需要按照bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 的描述做一些事情here @Hild 将 bin 添加到 PATH 也可以正常工作 【参考方案1】:

基于最新的kafka-client api和Kafka 2.1.1,工作版本代码如下:

使用 sbt 导入最新的 kafka-clients。

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += Seq("org.apache.kafka" % "kafka-clients" % "2.1.1",
"org.apache.kafka" %% "kafka" % "2.1.1")

scala中创建主题的代码:

import java.util.Arrays
import java.util.Properties

import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.admin.AdminClient, AdminClientConfig

class CreateKafkaTopic 
  def create(): Unit = 
    val config = new Properties()
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.30.1.5:9092")

    val localKafkaAdmin = AdminClient.create(config)

    val partitions = 3
    val replication = 1.toShort
    val topic = new NewTopic("integration-02", partitions, replication)
    val topics = Arrays.asList(topic)

    val topicStatus = localKafkaAdmin.createTopics(topics).values()
    //topicStatus.values()
    println(topicStatus.keySet())
  


使用以下方法验证新主题:

./kafka-topics.sh --zookeeper 192.30.1.5:2181 --list

希望它对某人有所帮助。 参考:http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html

【讨论】:

【参考方案2】:

我们可以使用AdminZkClient 来管理Kafka服务器中的主题。

String zookeeperHost = "127.0.0.1:2181";
Boolean isSucre = false;
int sessionTimeoutMs = 200000;
int connectionTimeoutMs = 15000;
int maxInFlightRequests = 10;
Time time = Time.SYSTEM;
String metricGroup = "myGroup";
String metricType = "myType";
KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperHost,isSucre,sessionTimeoutMs,
                connectionTimeoutMs,maxInFlightRequests,time,metricGroup,metricType);

AdminZkClient adminZkClient = new AdminZkClient(zkClient);

String topicName1 = "myTopic";
int partitions = 3;
int replication = 1;
Properties topicConfig = new Properties();

adminZkClient.createTopic(topicName1,partitions,replication,
            topicConfig,RackAwareMode.Disabled$.MODULE$);

您可以参考此链接了解详细信息 https://www.analyticshut.com/streaming-services/kafka/create-and-list-kafka-topics-in-java/

【讨论】:

【参考方案3】:

从 0.11.0.0 开始,您只需要:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

此工件现在包含AdminClient (org.apache.kafka.clients.admin)。

AdminClient 可以处理许多 Kafka 管理任务,包括创建主题:

Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");

AdminClient admin = AdminClient.create(config);

Map<String, String> configs = new HashMap<>();
int partitions = 1;
int replication = 1;

admin.createTopics(asList(new NewTopic("topic", partitions, replication).configs(configs)));

此命令的输出是 CreateTopicsResult,您可以使用它为整个操作或每个单独的主题创建获取 Future

要获得整个操作的未来,请使用CreateTopicsResult#all()。 要单独获取所有主题的Futures,请使用CreateTopicsResult#values()

例如:

CreateTopicsResult result = ...
KafkaFuture<Void> all = result.all();

或:

CreateTopicsResult result = ...
for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) 
    try 
        entry.getValue().get();
        log.info("topic  created", entry.getKey());
     catch (InterruptedException | ExecutionException e) 
        if (Throwables.getRootCause(e) instanceof TopicExistsException) 
            log.info("topic  existed", entry.getKey());
        
    

KafkaFuture 是“一个支持调用链和其他异步编程模式的灵活未来”,并且“最终将成为 Java 8 的CompletebleFuture 之上的一个薄垫片。”

【讨论】:

永远挂起 :-(【参考方案4】:

从 Kafka 0.10.1 开始,Michael 提到的 ZKStringSerializer 是私有的(对于 Scala)。您可以使用 ZkUtils 中的工厂方法 createZkClient 或 createZkClientAndConnection。

Kafka 0.10.1 的 Scala 示例:

import kafka.utils.ZkUtils

val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(
  "localhost:2181", sessionTimeoutMs, connectionTimeoutMs) 

然后按照 Michael 的建议创建主题:

import kafka.admin.AdminUtils

val zkUtils = new ZkUtils(zkClient, zkConnection, false)
val numPartitions = 4
val replicationFactor = 1
val topicConfig = new Properties
val topic = "my-topic"
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)

【讨论】:

【参考方案5】:

在 Kafka 0.8.1+(截至目前最新版本的 Kafka)中,您可以通过 AdminCommand 以编程方式创建新主题。在此问题的先前答案之一中提到的CreateTopicCommand(旧版 Kafka 0.8.0 的一部分)的功能已移至AdminCommand

Kafka 0.8.1 的 Scala 示例:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
// Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
// createTopic() will only seem to work (it will return without error).  The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,
    ZKStringSerializer)

// Create a topic named "myTopic" with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)

构建依赖,以 sbt 为例:

libraryDependencies ++= Seq(
  "com.101tec" % "zkclient" % "0.4",
  "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
    exclude("javax.jms", "jms")
    exclude("com.sun.jdmk", "jmxtools")
    exclude("com.sun.jmx", "jmxri"),
  ...
)

编辑:为 Kafka 0.9.0.0 添加了 Java 示例(截至 2016 年 1 月的最新版本)。

Maven 依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>

代码:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample 

  public static void main(String[] args) 
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;
    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;
    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
    zkClient.close();
  


编辑 2:为 Kafka 0.10.2.0 添加了 Java 示例(截至 2017 年 4 月的最新版本)。

Maven 依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.9</version>
</dependency>

代码:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample 

  public static void main(String[] args) 
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here

    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;

    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
    zkClient.close();
  


【讨论】:

请注意,当不使用 ZKStringSerializer 初始化 ZkClient 时,createTopic 将无错误返回。该主题会存在于zookeeper中,并在列出主题时返回,但Kafka本身并不创建该主题。 @quux00 您可以使用我编写的以下代码轻松创建它:ZkClient zkClient = new ZkClient(zkServer); zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); 请参考:source code for zkClient link 【参考方案6】:

您正在尝试使用哪个 IDE?

请提供完整路径,下面是来自终端的命令,它将创建一个主题

    cd kafka/bin ./kafka-create-topic.sh --topic test --zookeeper localhost:2181

【讨论】:

【参考方案7】:

如果您使用的是 Kafka 0.10.0.0+,从 Java 创建主题需要传递 RackAwareMode 类型的参数。这是一个 Scala 案例对象,从 Java 中获取它的实例很棘手(例如证明:How do I "get" a Scala case object from Java?。但它不适用于我们的案例)。

幸运的是,rackAwareMode 是一个可选参数。然而 Java 不支持可选参数。我们如何解决这个问题?这是一个解决方案:

AdminUtils.createTopic(zkUtils, topic, 1, 1, 
    AdminUtils.createTopic$default$5(),
    AdminUtils.createTopic$default$6());

将它与 miguno 的答案一起使用,你就可以开始了。

【讨论】:

【参考方案8】:

从 Kafka 0.8 Producer Example 开始,下面的示例将创建一个名为 page_visits 的主题,并且如果在 Kafka Broker config 文件中将 auto.create.topics.enable 属性设置为 true(默认),也会开始生成

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer 
    public static void main(String[] args) 
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++)  
            long runtime = new Date().getTime();  
            String ip = “192.168.2.” + rnd.nextInt(255); 
            String msg = runtime + “,www.example.com,” + ip; 
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
            producer.send(data);
        
        producer.close();
   

【讨论】:

我相信您知道从技术上讲,这并不像 OP 所要求的那样通过 API 创建主题。它只是使用auto.create.topics.enable 功能在引用不存在的主题时创建一个主题。并且使用这种方法,除了在代理配置文件中默认硬编码之外,没有其他方法可以指定主题设置。【参考方案9】:

您可以尝试使用 kafka.admin.CreateTopicCommand scala 类从 Java 代码创建主题...提供必要的参数。

String [] arguments = new String[8];
arguments[0] = "--zookeeper";
arguments[1] = "10.***.***.***:2181";
arguments[2] = "--replica";
arguments[3] = "1";
arguments[4] = "--partition";
arguments[5] = "1";
arguments[6] = "--topic";
arguments[7] = "test-topic-Biks";

CreateTopicCommand.main(arguments);

注意:您应该为 jopt-simple-4.5zkclient-0.1 添加 maven 依赖项

【讨论】:

【参考方案10】:

要通过 java api 和 Kafka 0.8+ 创建主题,请尝试以下操作,

首先导入下面的语句

import kafka.utils.ZKStringSerializer$;

通过以下方式为ZkClient创建对象,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());

【讨论】:

【参考方案11】:

您的电话无法正常工作的几种方式。

    如果您的 Kafka 集群没有足够的节点来支持复制值 3。

    如果有 chroot 路径前缀,则必须在 zookeeper 端口后附加它

    运行时你不在 Kafka 安装目录中(这很有可能)

【讨论】:

以上是关于如何在kafka中创建topic的主要内容,如果未能解决你的问题,请参考以下文章

如何从具有小数类型列的主题在 ksql 中创建流

如何通过 Java 在 Kafka 中创建主题

如何在 apache kafka 中创建主题?

如何在 kafka 中创建自定义序列化程序?

如何使用Python以编程方式在Apache Kafka中创建主题

如何从kafka主题为ksqldb创建主题