如何使用 testcontainers 和 spring-kafka 准备测试

Posted

技术标签:

【中文标题】如何使用 testcontainers 和 spring-kafka 准备测试【英文标题】:How to prepare tests using testcontainers and spring-kafka 【发布时间】:2020-10-19 10:14:04 【问题描述】:

我正在尝试为 kafka 消息传递设置我的集成测试,并从使用 Embedded-Kafka 转为使用 Testcontainers。给定以下 docker-compose 配置和所有集成测试的基类:

kafka-compose.yaml:

version: '3.3'

services:
  zookeeper:
    image: "wurstmeister/zookeeper"
  kafka:
    image: "wurstmeister/kafka:2.12-2.2.2"
    ports:
      - "9092:9092"
    depends_on:
      - "zookeeper"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_HOST_NAME: "$KAFKA_HOST:-localhost"
      KAFKA_ADVERTISED_PORT: "9092"
      KAFKA_CREATE_TOPICS: "recoverer-test:1:1,some-topic"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
@SpringBootTest
@Slf4j
public class IntegrationTest 
  private static final DockerComposeContainer kafkaContainer = initializeKafkaContainer();

  protected static DockerComposeContainer initializeKafkaContainer() 
    log.info(
        "Initializing kafka container. Should be called only once. Current value of the kafkaContainer: ",
        kafkaContainer);
    try 
      var kafkaContainer =
          new DockerComposeContainer(new File("src/test/resources/kafka-compose.yml"))
              .withExposedService("kafka_1", 9092);
      kafkaContainer.start();

      var bootstrapServers =
          format(
              "PLAINTEXT://%s:%s",
              kafkaContainer.getServiceHost("kafka_1", 9092),
              kafkaContainer.getServicePort("kafka_1", 9092));

      System.setProperty("spring.embedded.kafka.brokers", bootstrapServers);

      return kafkaContainer;
     catch (Throwable t) 
      log.error("Can't initialize the Kafka test container.", t);
      throw t;
    
  
  
@DirtiesContext(classMode = ClassMode.BEFORE_CLASS)
class PerformSomethingInboundAdapterTest extends IntegrationTest 

  private static final String GROUP_ID = "test-group-id";
  private static final TopicPartition PARTITION = new TopicPartition(SOME_TOPIC, 0);
  private static final Instant RECEIVED_AT = now();
  private static final CustomerNumber CUSTOMER_NUMBER = CustomerNumber.of(600830);

  @Autowired private KafkaListenerEndpointRegistry kafkaListenerRegistry;

  @Autowired private ConsumerFactory<String, String> consumerFactory;

  @Autowired private KafkaTemplate<Object, Object> kafkaTemplate;

  @MockBean private ActivateSomethingActivities activateCampaignActivities;

  private Consumer<String, String> consumer;

  private long initiallyCommittedOffset;

  @BeforeEach
  void startKafkaListener() 
    kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::start);
  

  @AfterEach
  void stopKafkaListener() 
    kafkaListenerRegistry.getListenerContainers().forEach(Lifecycle::stop);
  

  @Test
  void shouldPerformSomething() 
...
  

我遇到的问题很少:

    似乎 spring-kafka 和它的 @KafkaListeners 在使用 @SpringBootTest 注释的所有可能测试期间都处于活动状态,而不仅仅是在特定于 kafka 的测试期间。这意味着发送到 kafka 主题的消息可以被任意测试使用。首先是 spring-kafka 家伙的问题:是否可以将 spring-kafka-test 与 Testcontainers 一起使用?是否有可能停止每个测试的所有 @KafkaListener 并为特定的 @SpringBootTest 测试显式启用它们? Testcontainers 自带一个 Kafka 模块。这个使用了 confluent kafka docker 镜像,它在配置方面非常顽固。例如,您不能设置一些代理属性,也不能告诉容器在启动后应该创建哪个主题。在与这个模块苦苦挣扎之后,我决定将 docker-compose 模块与wurstmeister/kafka 图像一起使用。后一种方法的问题是,当我使用命令行 maven 运行测试时,我收到一条错误消息,告诉我 kafka 已经在 9092 端口上运行。似乎 maven 在 mvn test 期间启动了很少的 JVM,因此静态字段 kafkaContainer 被初始化了几次。为什么会这样?

【问题讨论】:

【参考方案1】:
    在类上使用 @DirtiesContext 在测试完成时关闭侦听器。

不知道#2。

【讨论】:

当我添加@DirtiesContext(classMode = ClassMode.BEFORE_CLASS) 时似乎没有任何效果。我更新了我的初始帖子,请参阅测试类。也许那里有问题。【参考方案2】:

第一个可以通过对不同的测试使用不同的上下文来克服

例子:

@ExtendWith(SpringExtension.class)
@DataJpaTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@TestExecutionListeners(DependencyInjectionTestExecutionListener.class,
        FlywayTestExecutionListener.class)
@FlywayTest
@ActiveProfiles("test")
public abstract class AbstractDatabaseTest 

这是我为集成测试创建的测试,我会在需要数据库层的地方扩展此测试,您可以尝试为 Kafka 测试创建一个类似的测试。 一般来说,@SpringBootTest 会加速整个应用程序,这可能需要更长时间,我个人不喜欢它。

第二很难说。您可以尝试在initializeKafkaContainer() 中打印堆栈跟踪,以查看执行该操作的测试。或者,如果您尝试使用以前的方法,您可以在抽象类的静态块中进行初始化,然后扩展它的每个测试都将使用现有的静态容器,并且只会初始化一次。

【讨论】:

以上是关于如何使用 testcontainers 和 spring-kafka 准备测试的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 junit5 和 testcontainers 测试存储库?

使用 testcontainers 测试 kafka 和 spark

如何运行自定义 docker 镜像 testContainer

如何将 Testcontainers 与 @DataJpaTest 结合使用以避免代码重复?

如何使用 Testcontainers 发送信号?

如何使用 TestContainers + Spring Boot + oracle-xe