KafkaContainer - 如何在启动()之后在 Spring Boot 中读取 kafka 容器端口作为属性 / 如何在启动之前配置 Kafka 端口
Posted
技术标签:
【中文标题】KafkaContainer - 如何在启动()之后在 Spring Boot 中读取 kafka 容器端口作为属性 / 如何在启动之前配置 Kafka 端口【英文标题】:KafkaContainer - how to read kafka container port as property in Spring Boot after start() / how to configure Kafka port before starting 【发布时间】:2021-03-12 16:36:37 【问题描述】:我正在使用KafkaContainer
进行集成测试;启动容器后,端口是随机选择的。
我可以通过container.getBootStrapServer()
获取端口,但我需要将动态主机+端口放在属性中(application-itest.yml
,在spring.kafka.bootstrap-servers
下,因为这是 Spring Kafka 选择配置我的值的地方KafkaTemplate
的bean)。
或者,如果我可以在启动KafkaContainer
时将端口设置为静态值,例如9999
,并将其设置在 yaml 文件中,它也可以工作。我也不知道怎么做。(获取值后是否需要注入另一个KafkaTemplate
的bean?)
如何启动 KafkaContainer:
public static final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:5.4.3")
);
private AdminClient adminClient;
...
@BeforeEach
void setup()
if (adminClient == null)
kafka.start();
// here value is dynamic and is not accessible in Environment; not as `@AutoConfigureWiremock`, I have `wiremock.server` value set after)
String bootStrapServer = kafka.getBootstrapServers();
log.info("Kafka container created at: ", bootStrapServer);
adminClient = AdminClient.create(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer
));
// create my topic
log.info("Current topic: ", topic);
adminClient.createTopics(Set.of(
new NewTopic(topic, 4, (short) 1)
));
【问题讨论】:
【参考方案1】:你可以使用@DynamicPropertySource:
需要添加的集成测试的方法级注解 具有动态值的属性到环境的集合 财产来源。此注释及其支持基础架构 最初旨在允许基于 Testcontainers 的属性 测试可以很容易地暴露给 Spring 集成测试。然而,这 功能也可以与任何形式的外部资源一起使用 生命周期在测试的 ApplicationContext 之外维护。
使用@DynamicPropertySource 注解的方法必须是静态的并且必须 有一个 DynamicPropertyRegistry 参数,用于添加 环境的一组 PropertySources 的名称-值对。价值观 是动态的,并通过供应商提供,只有在 属性已解决。
在你的测试课中:
@Container
private static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry)
registry.add("spring.kafka.bootstrap-servers", () ->
return kafkaContainer.getHost() + ":" + kafkaContainer.getFirstMappedPort();
);
【讨论】:
【参考方案2】:我没有办法做到这一点。最后我转向EmbeddedKafka
并使用SpEL "spring.kafka.bootstrap-servers=$spring.embedded.kafka.brokers"
填充主机+端口。即使使用 @TestPropertySource(properties=xxx)
也可以。
基本上在这个帖子里:https://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/
请记住,属性设置应该在测试级别进行,因为当您只想在某个 itest 中使用 Kafka 时,将属性添加到 application-itest.yml
会导致 所有 itest 类查找 EmbeddedKafka在类路径中,当你没有在它们上添加@EmbeddedKafka
时会感到恐慌。
【讨论】:
以上是关于KafkaContainer - 如何在启动()之后在 Spring Boot 中读取 kafka 容器端口作为属性 / 如何在启动之前配置 Kafka 端口的主要内容,如果未能解决你的问题,请参考以下文章
Producer#initTransactions 不适用于 KafkaContainer