KafkaContainer - 如何在启动()之后在 Spring Boot 中读取 kafka 容器端口作为属性 / 如何在启动之前配置 Kafka 端口

Posted

技术标签:

【中文标题】KafkaContainer - 如何在启动()之后在 Spring Boot 中读取 kafka 容器端口作为属性 / 如何在启动之前配置 Kafka 端口【英文标题】:KafkaContainer - how to read kafka container port as property in Spring Boot after start() / how to configure Kafka port before starting 【发布时间】:2021-03-12 16:36:37 【问题描述】:

我正在使用KafkaContainer 进行集成测试;启动容器后,端口是随机选择的。

我可以通过container.getBootStrapServer() 获取端口,但我需要将动态主机+端口放在属性中(application-itest.yml,在spring.kafka.bootstrap-servers 下,因为这是 Spring Kafka 选择配置我的值的地方KafkaTemplate的bean)。

或者,如果我可以在启动KafkaContainer 时将端口设置为静态值,例如9999,并将其设置在 yaml 文件中,它也可以工作。我也不知道怎么做。(获取值后是否需要注入另一个KafkaTemplate的bean?)

如何启动 KafkaContainer:

    public static final KafkaContainer kafka = new KafkaContainer(
            DockerImageName.parse("confluentinc/cp-kafka:5.4.3")
    );
    private AdminClient adminClient;

    ...

    @BeforeEach
    void setup() 
        if (adminClient == null) 
            kafka.start();
            // here value is dynamic and is not accessible in Environment; not as `@AutoConfigureWiremock`, I have `wiremock.server` value set after)
            String bootStrapServer = kafka.getBootstrapServers();
            log.info("Kafka container created at: ", bootStrapServer);
            adminClient = AdminClient.create(Map.of(
                    AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer
            ));
            // create my topic
            log.info("Current topic: ", topic);
            adminClient.createTopics(Set.of(
                    new NewTopic(topic, 4, (short) 1)
            ));
        
    

【问题讨论】:

【参考方案1】:

你可以使用@DynamicPropertySource:

需要添加的集成测试的方法级注解 具有动态值的属性到环境的集合 财产来源。此注释及其支持基础架构 最初旨在允许基于 Testcontainers 的属性 测试可以很容易地暴露给 Spring 集成测试。然而,这 功能也可以与任何形式的外部资源一起使用 生命周期在测试的 ApplicationContext 之外维护。

使用@DynamicPropertySource 注解的方法必须是静态的并且必须 有一个 DynamicPropertyRegistry 参数,用于添加 环境的一组 PropertySources 的名称-值对。价值观 是动态的,并通过供应商提供,只有在 属性已解决。

在你的测试课中:

@Container
private static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) 
    registry.add("spring.kafka.bootstrap-servers", () ->  
        return kafkaContainer.getHost() + ":" + kafkaContainer.getFirstMappedPort();
    );    

【讨论】:

【参考方案2】:

我没有办法做到这一点。最后我转向EmbeddedKafka 并使用SpEL "spring.kafka.bootstrap-servers=$spring.embedded.kafka.brokers" 填充主机+端口。即使使用 @TestPropertySource(properties=xxx) 也可以。

基本上在这个帖子里:https://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/

请记住,属性设置应该在测试级别进行,因为当您只想在某个 itest 中使用 Kafka 时,将属性添加到 application-itest.yml 会导致 所有 itest 类查找 EmbeddedKafka在类路径中,当你没有在它们上添加@EmbeddedKafka 时会感到恐慌。

【讨论】:

以上是关于KafkaContainer - 如何在启动()之后在 Spring Boot 中读取 kafka 容器端口作为属性 / 如何在启动之前配置 Kafka 端口的主要内容,如果未能解决你的问题,请参考以下文章

Producer#initTransactions 不适用于 KafkaContainer

[dubbo 源码之 ]2. 服务消费方如何启动服务

推荐系统之冷启动问题

深入浅出mybatis之启动详解

调试Idea如何远程debug之SpringBoot jar包启动

SpringBoot系列之tomcat内嵌web容器是如何启动的