spring boot kafka 在将 testcontainers 与 kafka、zookeeper、模式注册表一起使用时因“代理可能不可用”而失败
Posted
技术标签:
【中文标题】spring boot kafka 在将 testcontainers 与 kafka、zookeeper、模式注册表一起使用时因“代理可能不可用”而失败【英文标题】:spring boot kafka failed with "Broker may not be available" while using testcontainers with kafka, zookeeper, schema registry 【发布时间】:2021-05-23 20:58:36 【问题描述】:我已经在测试代码中运行了如下所示的测试容器。
@Testcontainers
public class TestEnvironmentSupport
static String version = "5.4.0";
static DockerImageName kafkaImage = DockerImageName.parse("confluentinc/cp-server").withTag(version);
static DockerImageName zookeeperImage = DockerImageName.parse("confluentinc/cp-zookeeper").withTag(version);
static DockerImageName schemaRegistryImage = DockerImageName.parse("confluentinc/cp-schema-registry").withTag(version);
static Network network = Network.newNetwork();
@Container
static GenericContainer zookeeper = new GenericContainer<>(zookeeperImage)
.withNetwork(network)
.withCreateContainerCmdModifier(cmd -> cmd.withHostName("zookeeper"))
.withExposedPorts(2181)
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181")
.withEnv("ZOOKEEPER_TICK_TIME", "2000");
@Container
static GenericContainer kafka = new GenericContainer<>(kafkaImage)
.withNetwork(network)
.withCreateContainerCmdModifier(cmd -> cmd.withHostName("kafka"))
.withExposedPorts(9092)
.dependsOn(zookeeper)
.withEnv("KAFKA_BROKER_ID", "1")
.withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181")
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT")
.withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092")
.withEnv("KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL", "schema-registry:8081");
@Container
static GenericContainer schemaRegistry = new GenericContainer<>(schemaRegistryImage)
.withNetwork(network)
.withCreateContainerCmdModifier(cmd -> cmd.withHostName("schema-registry"))
.withExposedPorts(8081)
.dependsOn(zookeeper, kafka)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL", "zookeeper:2181");
@Test
void test()
assertTrue(zookeeper.isRunning());
assertTrue(kafka.isRunning());
assertTrue(schemaRegistry.isRunning());
它工作得很好。 但是当我尝试使用上面的 testcontainer 配置运行 spring boot 测试时出现了问题,因为 testcontainer 动态生成代理端口,但是 NetworkClient 不断地使用 localhost:9092 访问代理,即使我动态覆盖了@SpringBootTest 代码上的属性,如下所示
@DynamicPropertySource
static void testcontainerProperties(final DynamicPropertyRegistry registry)
var bootstrapServers = kafka.getHost() + ":" + kafka.getMappedPort(9092);
var schemaRegistryUrl = "http://" + schemaRegistry.getHost() + ":" + schemaRegistry.getMappedPort(8081);
registry.add("spring.cloud.stream.kafka.binder.brokers", () -> bootstrapServers);
registry.add("bootstrap.servers", () -> bootstrapServers);
registry.add("schema.registry.url", () -> schemaRegistryUrl);
registry.add("spring.cloud.stream.kafka.default.consumer.configuration.schema.registry.url", () -> schemaRegistryUrl);
下面是AdminClientConfig的log on启动时间,它显示了bootstrap.servers = [localhost:56001]
的端口被testcontainer动态绑定。
2021-02-21 20:28:52.291 INFO 78241 --- [ Test worker] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:56013]
client.dns.lookup = use_all_dns_ips
即使我这样设置,它仍然会尝试连接到 localhost:9092,如下所示。
2021-02-21 20:28:52.457 INFO 78241 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1613906932454
2021-02-21 20:28:53.095 WARN 78241 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-02-21 20:28:53.202 WARN 78241 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-02-21 20:28:53.407 WARN 78241 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
下面是 docker ps
在运行 spring boot 测试时的结果。
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e340d9e15fe4 confluentinc/cp-schema-registry:5.4.0 "/etc/confluent/dock…" 46 seconds ago Up 46 seconds 0.0.0.0:56014->8081/tcp optimistic_joliot
ad3bf06df4b3 confluentinc/cp-server:5.4.0 "/etc/confluent/dock…" 55 seconds ago Up 54 seconds 0.0.0.0:56013->9092/tcp infallible_brown
f7fa5f4ae23c confluentinc/cp-zookeeper:5.4.0 "/etc/confluent/dock…" About a minute ago Up 59 seconds 0.0.0.0:56012->2181/tcp, 0.0.0.0:56011->2888/tcp, 0.0.0.0:56010->3888/tcp agitated_leavitt
b1c036cdf00b testcontainers/ryuk:0.3.0 "/app" About a minute ago Up About a minute 0.0.0.0:56009->8080/tcp testcontainers-ryuk-68190eaa-8513-4dd8-ab67-175275f15a82
我尝试使用 docker compose 模块运行 testcontainers,但它有同样的问题。 我究竟做错了什么? 请帮忙。
【问题讨论】:
【参考方案1】:它正在尝试连接到localhost:9092
,因为您尝试连接到通告的PLAINTEXT_HOST
端口,这就是它要返回的地址。您不需要为测试宣传两个侦听器,因此请尝试直接使用kafka:29092
而不是调用映射端口方法。此外,除非您特别需要服务器端架构验证,否则您只需要 confluentinc/cp-kafka
图像。
spring-kafka 嵌入式代理也应该可以运行您的测试,因此您不需要测试容器
【讨论】:
以上是关于spring boot kafka 在将 testcontainers 与 kafka、zookeeper、模式注册表一起使用时因“代理可能不可用”而失败的主要内容,如果未能解决你的问题,请参考以下文章
docker快速安装kafka,zookeeper ,体验spring-boot-demo-mq-kafka