集成测试之嵌入式Kafka

Posted 软件测试那些事

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了集成测试之嵌入式Kafka相关的知识,希望对你有一定的参考价值。

      在项目中,团队也使用了Kafka作为消息中间件。经过了嵌入式redis选型的问题之后,笔者在嵌入式kafka选型时就更倾向于还在持续更新,并且维护人员是一个团队而不是个人或者松散的组织。最终,笔者选择了来自salesforce的开源项目

<groupId>com.salesforce.kafka.test</groupId>
<artifactId>kafka-junit</artifactId>
<version>3.0.1</version>

以下是在项目自带的测试用例代码上稍加修改的案例。

package com.salesforce.kafka.test.junit4;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.util.Collection;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.common.Node;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.salesforce.kafka.test.KafkaBroker;
import com.salesforce.kafka.test.KafkaTestServer;

public class KafkaBrokerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerTest.class);

/**
* Validate that we started 2 brokers.
* @throws Exception
*/

@Test
public void testTwoBrokersStarted() throws Exception {
Properties overrideProperties = new Properties();
overrideProperties.put("broker.id", "1");
overrideProperties.put("port", "6666");
KafkaTestServer server1= new KafkaTestServer(overrideProperties);
server1.start();
String string= server1.getKafkaBrokers().getBrokerById(1).getConnectString();
logger.info("\n"+string+"\n");

overrideProperties.put("broker.id", "2");
overrideProperties.put("port", "8888");
KafkaTestServer server2= new KafkaTestServer(overrideProperties);
server2.start();
String string2= server2.getKafkaBrokers().getBrokerById(2).getConnectString();
logger.info("\n"+string2+"\n");

}

}

问题

从上述案例中可以看出,在实际的kafka使用中,IP+端口号是每个kafka broker都不一样的。但是在salesforce/kafka-core中提供的KafkaTestCluster类中,其并没有给外部来指定某个broker port的机会。

 /**
* Starts the cluster.
* @throws Exception on startup errors.
* @throws TimeoutException When the cluster fails to start up within a timely manner.
*/

public void start() throws Exception, TimeoutException {
// Ensure zookeeper instance has been started.
zkTestServer.start();

// If we have no brokers defined yet...
if (brokers.isEmpty()) {
// Loop over brokers, starting with brokerId 1.
for (int brokerId = 1; brokerId <= numberOfBrokers; brokerId++) {
// Create properties for brokers
final Properties brokerProperties = new Properties();

// Add user defined properties.
brokerProperties.putAll(overrideBrokerProperties);

// Set broker.id
brokerProperties.put("broker.id", String.valueOf(brokerId));

// Create new KafkaTestServer and add to our broker list
brokers.add(
new KafkaTestServer(brokerProperties, zkTestServer)
);
}
}

// Loop over each broker and start it
for (final KafkaTestServer broker : brokers) {
broker.start();
}

// Block until the cluster is 'up' or the timeout is exceeded.
waitUntilClusterReady(10_000L);
}

这在某些需要预先指定IP+端口号的场景中还是有一些麻烦的。需要后续进行处理。例如,给这个类新增一个构造方法,利用以下的List ,把已经完成初始化的List<KafkaTestServer> brokers 作为入参传递进去。

private final List<KafkaTestServer> brokers = new ArrayList<>();

新增构造方法:

public KafkaTestCluster(final List<KafkaTestServer> brokers)


以上是关于集成测试之嵌入式Kafka的主要内容,如果未能解决你的问题,请参考以下文章

春天Kafka集成测试听众不工作

在使用 Springboot 运行集成测试时启动嵌入式 gRPC 服务器

测试基础之集成测试(初入行)

持续集成之Gitlab安装与应用

软件开发之持续集成

嵌入式代码单元/集成测试工具VectorCAST