java 嵌入Kafka + Zookeeper用于测试目的。使用Apache Kafka 0.8进行测试

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 嵌入Kafka + Zookeeper用于测试目的。使用Apache Kafka 0.8进行测试相关的知识,希望对你有一定的参考价值。

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
 
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 
public class ZooKeeperLocal {
	
	ZooKeeperServerMain zooKeeperServer;
	
	public ZooKeeperLocal(Properties zkProperties) throws FileNotFoundException, IOException{
		QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
		try {
		    quorumConfiguration.parseProperties(zkProperties);
		} catch(Exception e) {
		    throw new RuntimeException(e);
		}
 
		zooKeeperServer = new ZooKeeperServerMain();
		final ServerConfig configuration = new ServerConfig();
		configuration.readFrom(quorumConfiguration);
		
		
		new Thread() {
		    public void run() {
		        try {
		            zooKeeperServer.runFromConfig(configuration);
		        } catch (IOException e) {
		            System.out.println("ZooKeeper Failed");
		            e.printStackTrace(System.err);
		        }
		    }
		}.start();
	}
}
public class MyTest {
 
	static KafkaLocal kafka;
 
	@BeforeClass
	public static void startKafka(){
		Properties kafkaProperties = new Properties();
		Properties zkProperties = new Properties();
		
		try {
			//load properties
			kafkaProperties.load(Class.class.getResourceAsStream("/kafkalocal.properties"));
			zkProperties.load(Class.class.getResourceAsStream("/zklocal.properties"));
			
			//start kafka
			kafka = new KafkaLocal(kafkaProperties, zkProperties);
			Thread.sleep(5000);
		} catch (Exception e){
			e.printStackTrace(System.out);
			fail("Error running local Kafka broker");
			e.printStackTrace(System.out);
		}
		
		//do other things
	}
 
	@Test
	public void testSomething() {
	}
 
}
import java.io.IOException;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
 
 
public class KafkaLocal {
 
	public KafkaServerStartable kafka;
	public ZooKeeperLocal zookeeper;
	
	public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException{
		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
		
		//start local zookeeper
		System.out.println("starting local zookeeper...");
		zookeeper = new ZooKeeperLocal(zkProperties);
		System.out.println("done");
		
		//start local kafka broker
		kafka = new KafkaServerStartable(kafkaConfig);
		System.out.println("starting local kafka broker...");
		kafka.startup();
		System.out.println("done");
	}
	
	
	public void stop(){
		//stop kafka broker
		System.out.println("stopping kafka...");
		kafka.shutdown();
		System.out.println("done");
	}
	
}

以上是关于java 嵌入Kafka + Zookeeper用于测试目的。使用Apache Kafka 0.8进行测试的主要内容,如果未能解决你的问题,请参考以下文章

测开之路七十三:用kafka实现消息队列之环境搭建

在非root用户下启动kafka和zookeeper

Java面试题(Kafka篇+zookeeper 篇)

Java面试题(Kafka篇+zookeeper 篇)

zookeeper+kafka,使用Java实现消息对接读取

docker 配置 kafka+zookeeper,golang接入示例