如何使用 testcontainers 和 spring-kafka 准备测试
Posted
技术标签:
【中文标题】如何使用 testcontainers 和 spring-kafka 准备测试【英文标题】:How to prepare tests using testcontainers and spring-kafka 【发布时间】:2020-10-19 10:14:04 【问题描述】:我正在尝试为 kafka 消息传递设置我的集成测试,并从使用 Embedded-Kafka 转为使用 Testcontainers。给定以下 docker-compose 配置和所有集成测试的基类:
kafka-compose.yaml:
version: '3.3'
services:
zookeeper:
image: "wurstmeister/zookeeper"
kafka:
image: "wurstmeister/kafka:2.12-2.2.2"
ports:
- "9092:9092"
depends_on:
- "zookeeper"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_HOST_NAME: "$KAFKA_HOST:-localhost"
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_CREATE_TOPICS: "recoverer-test:1:1,some-topic"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
@SpringBootTest
@Slf4j
public class IntegrationTest
private static final DockerComposeContainer kafkaContainer = initializeKafkaContainer();
protected static DockerComposeContainer initializeKafkaContainer()
log.info(
"Initializing kafka container. Should be called only once. Current value of the kafkaContainer: ",
kafkaContainer);
try
var kafkaContainer =
new DockerComposeContainer(new File("src/test/resources/kafka-compose.yml"))
.withExposedService("kafka_1", 9092);
kafkaContainer.start();
var bootstrapServers =
format(
"PLAINTEXT://%s:%s",
kafkaContainer.getServiceHost("kafka_1", 9092),
kafkaContainer.getServicePort("kafka_1", 9092));
System.setProperty("spring.embedded.kafka.brokers", bootstrapServers);
return kafkaContainer;
catch (Throwable t)
log.error("Can't initialize the Kafka test container.", t);
throw t;
@DirtiesContext(classMode = ClassMode.BEFORE_CLASS)
class PerformSomethingInboundAdapterTest extends IntegrationTest
private static final String GROUP_ID = "test-group-id";
private static final TopicPartition PARTITION = new TopicPartition(SOME_TOPIC, 0);
private static final Instant RECEIVED_AT = now();
private static final CustomerNumber CUSTOMER_NUMBER = CustomerNumber.of(600830);
@Autowired private KafkaListenerEndpointRegistry kafkaListenerRegistry;
@Autowired private ConsumerFactory<String, String> consumerFactory;
@Autowired private KafkaTemplate<Object, Object> kafkaTemplate;
@MockBean private ActivateSomethingActivities activateCampaignActivities;
private Consumer<String, String> consumer;
private long initiallyCommittedOffset;
@BeforeEach
void startKafkaListener()
kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::start);
@AfterEach
void stopKafkaListener()
kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::stop);
@Test
void shouldPerformSomething()
...
我遇到的问题很少:
-
似乎 spring-kafka 和它的
@KafkaListener
s 在使用 @SpringBootTest 注释的所有可能测试期间都处于活动状态,而不仅仅是在特定于 kafka 的测试期间。这意味着发送到 kafka 主题的消息可以被任意测试使用。首先是 spring-kafka 家伙的问题:是否可以将 spring-kafka-test 与 Testcontainers 一起使用?是否有可能停止每个测试的所有 @KafkaListener
并为特定的 @SpringBootTest
测试显式启用它们?
Testcontainers 自带一个 Kafka 模块。这个使用了 confluent kafka docker 镜像,它在配置方面非常顽固。例如,您不能设置一些代理属性,也不能告诉容器在启动后应该创建哪个主题。在与这个模块苦苦挣扎之后,我决定将 docker-compose 模块与wurstmeister/kafka
图像一起使用。后一种方法的问题是,当我使用命令行 maven 运行测试时,我收到一条错误消息,告诉我 kafka 已经在 9092 端口上运行。似乎 maven 在 mvn test
期间启动了很少的 JVM,因此静态字段 kafkaContainer
被初始化了几次。为什么会这样?
【问题讨论】:
【参考方案1】:-
在类上使用
@DirtiesContext
在测试完成时关闭侦听器。
不知道#2。
【讨论】:
当我添加@DirtiesContext(classMode = ClassMode.BEFORE_CLASS) 时似乎没有任何效果。我更新了我的初始帖子,请参阅测试类。也许那里有问题。【参考方案2】:第一个可以通过对不同的测试使用不同的上下文来克服
例子:
@ExtendWith(SpringExtension.class)
@DataJpaTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@TestExecutionListeners(DependencyInjectionTestExecutionListener.class,
FlywayTestExecutionListener.class)
@FlywayTest
@ActiveProfiles("test")
public abstract class AbstractDatabaseTest
这是我为集成测试创建的测试,我会在需要数据库层的地方扩展此测试,您可以尝试为 Kafka 测试创建一个类似的测试。 一般来说,@SpringBootTest 会加速整个应用程序,这可能需要更长时间,我个人不喜欢它。
第二很难说。您可以尝试在initializeKafkaContainer()
中打印堆栈跟踪,以查看执行该操作的测试。或者,如果您尝试使用以前的方法,您可以在抽象类的静态块中进行初始化,然后扩展它的每个测试都将使用现有的静态容器,并且只会初始化一次。
【讨论】:
以上是关于如何使用 testcontainers 和 spring-kafka 准备测试的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 junit5 和 testcontainers 测试存储库?
使用 testcontainers 测试 kafka 和 spark
如何运行自定义 docker 镜像 testContainer