java 嵌入Kafka + Zookeeper用于测试目的。使用Apache Kafka 0.8进行测试
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 嵌入Kafka + Zookeeper用于测试目的。使用Apache Kafka 0.8进行测试相关的知识,希望对你有一定的参考价值。
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
public class ZooKeeperLocal {
ZooKeeperServerMain zooKeeperServer;
public ZooKeeperLocal(Properties zkProperties) throws FileNotFoundException, IOException{
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
try {
quorumConfiguration.parseProperties(zkProperties);
} catch(Exception e) {
throw new RuntimeException(e);
}
zooKeeperServer = new ZooKeeperServerMain();
final ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumConfiguration);
new Thread() {
public void run() {
try {
zooKeeperServer.runFromConfig(configuration);
} catch (IOException e) {
System.out.println("ZooKeeper Failed");
e.printStackTrace(System.err);
}
}
}.start();
}
}
public class MyTest {
static KafkaLocal kafka;
@BeforeClass
public static void startKafka(){
Properties kafkaProperties = new Properties();
Properties zkProperties = new Properties();
try {
//load properties
kafkaProperties.load(Class.class.getResourceAsStream("/kafkalocal.properties"));
zkProperties.load(Class.class.getResourceAsStream("/zklocal.properties"));
//start kafka
kafka = new KafkaLocal(kafkaProperties, zkProperties);
Thread.sleep(5000);
} catch (Exception e){
e.printStackTrace(System.out);
fail("Error running local Kafka broker");
e.printStackTrace(System.out);
}
//do other things
}
@Test
public void testSomething() {
}
}
import java.io.IOException;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
public class KafkaLocal {
public KafkaServerStartable kafka;
public ZooKeeperLocal zookeeper;
public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException{
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
//start local zookeeper
System.out.println("starting local zookeeper...");
zookeeper = new ZooKeeperLocal(zkProperties);
System.out.println("done");
//start local kafka broker
kafka = new KafkaServerStartable(kafkaConfig);
System.out.println("starting local kafka broker...");
kafka.startup();
System.out.println("done");
}
public void stop(){
//stop kafka broker
System.out.println("stopping kafka...");
kafka.shutdown();
System.out.println("done");
}
}
以上是关于java 嵌入Kafka + Zookeeper用于测试目的。使用Apache Kafka 0.8进行测试的主要内容,如果未能解决你的问题,请参考以下文章
测开之路七十三:用kafka实现消息队列之环境搭建
在非root用户下启动kafka和zookeeper
Java面试题(Kafka篇+zookeeper 篇)
Java面试题(Kafka篇+zookeeper 篇)
zookeeper+kafka,使用Java实现消息对接读取
docker 配置 kafka+zookeeper,golang接入示例