kafka入门2:java 创建及删除 topic

Posted MIC2016

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka入门2:java 创建及删除 topic相关的知识,希望对你有一定的参考价值。

1.pom

  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.2.1</version>
  </dependency>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.1</version>
  </dependency>

 

2.KafkaTopicBean

public class KafkaTopicBean {
    
    private String topicName;       // topic 名称
    private Integer partition;      // partition 分区数量
    private Integer replication;    // replication 副本数量
    private String descrbe;  
    
    public String getTopicName() {  
        return topicName;  
    }  
  
    public void setTopicName(String topicName) {  
        this.topicName = topicName;  
    }  
  
    public Integer getPartition() {  
        return partition;  
    }  
  
    public void setPartition(Integer partition) {  
        this.partition = partition;  
    }  
  
    public Integer getReplication() {  
        return replication;  
    }  
  
    public void setReplication(Integer replication) {  
        this.replication = replication;  
    }  
  
    public String getDescrbe() {  
        return descrbe;  
    }  
  
    public void setDescrbe(String descrbe) {  
        this.descrbe = descrbe;  
    }  
  
    @Override  
    public String toString() {  
        return "KafkaTopicBean [topicName=" + topicName + ", partition=" + partition  
                + ", replication=" + replication + ", descrbe=" + descrbe +"]";  
    }  

}

 

3.KafkaUtil

import java.util.Properties;
import org.apache.kafka.common.security.JaasUtils;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;

public class KafkaUtil {
    
     public static void createKafaTopic(String ZkStr,KafkaTopicBean topic) {  
         ZkUtils zkUtils = ZkUtils.
                 apply(ZkStr, 30000, 30000,JaasUtils.isZkSecurityEnabled()); 
         
        AdminUtils.createTopic(zkUtils, topic.getTopicName(),  topic.getPartition(), 
                topic.getReplication(),  new Properties(), new RackAwareMode.Enforced$());  
        zkUtils.close();
    }
     
     public static void deleteKafaTopic(String ZkStr,KafkaTopicBean topic) {  
         ZkUtils zkUtils = ZkUtils.
                 apply(ZkStr, 30000, 30000,JaasUtils.isZkSecurityEnabled()); 
         
        AdminUtils.deleteTopic(zkUtils, topic.getTopicName());  
        zkUtils.close();
    }

}

 

4.调用方式

    public static void main(String[] args) {
        
        //zookeeper地址:端口
        String ZkStr = "192.168.1.101:2181";    
        
        //topic对象
        KafkaTopicBean topic = new KafkaTopicBean();    
        topic.setTopicName("testTopic");                    
        topic.setPartition(1);
        topic.setReplication(1);
        
        //创建topic
        KafkaUtil.createKafaTopic(ZkStr,topic);
        //删除topic
        KafkaUtil.deleteKafaTopic(ZkStr,topic);
        
  }

 

以上是关于kafka入门2:java 创建及删除 topic的主要内容,如果未能解决你的问题,请参考以下文章

Kafka快速入门(命令行操作)

kakfa从入门到放弃: golang编程操作kafka

kakfa从入门到放弃: golang编程操作kafka

kakfa从入门到放弃: golang编程操作kafka

kafka集群的部署及kafka监控工具

Kafka Java API操作topic