如何使用 EmbeddedKafkaRule/EmbeddedKafka 设置 Spring Kafka 测试以修复 TopicExistsException 间歇性错误?

Posted

技术标签:

【中文标题】如何使用 EmbeddedKafkaRule/EmbeddedKafka 设置 Spring Kafka 测试以修复 TopicExistsException 间歇性错误?【英文标题】:How to set up Spring Kafka test using EmbeddedKafkaRule/ EmbeddedKafka to fix TopicExistsException Intermittent Error? 【发布时间】:2020-05-11 22:05:02 【问题描述】:

我在测试我的 Kafka 消费者和生产者时遇到了问题。集成测试间歇性失败,TopicExistsException

这就是我当前的测试类 - UserEventListenerTest 对于其中一位消费者的样子:

@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
    "application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest 
    private val logger: Logger = LoggerFactory.getLogger(javaClass)

    @Value("\$application.kafka.user-event-topic")
    private lateinit var userEventTopic: String

    @Autowired
    private lateinit var kafkaConfigProperties: KafkaConfigProperties

    private lateinit var embeddedKafka: EmbeddedKafkaRule
    private lateinit var sender: KafkaSender<String, UserEvent>
    private lateinit var receiver: KafkaReceiver<String, UserEvent>

    @BeforeAll
    fun setup() 
        embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
        embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
        embeddedKafka.before()

        val producerProps: HashMap<String, Any> = hashMapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
        )
        val senderOptions = SenderOptions.create<String, UserEvent>(producerProps)
        sender = KafkaSender.create(senderOptions)

        val consumerProps: HashMap<String, Any> = hashMapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to kafkaConfigProperties.deserializer,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
            "schema.registry.url" to kafkaConfigProperties.schemaRegistry,
            ConsumerConfig.GROUP_ID_CONFIG to "test-consumer"
        )
        val receiverOptions = ReceiverOptions.create<String, UserEvent>(consumerProps)
            .subscription(Collections.singleton("some-topic-after-UserEvent"))
        receiver = KafkaReceiver.create(receiverOptions)
    


// Some tests
// Not shown as they are irrelevant
...
...
...

UserEventListener 类使用来自user-event-topic-UserEventListenerTest 的消息并将消息发布到some-topic-after-UserEvent

正如您从设置中看到的那样,我有一个测试生产者将向user-event-topic-UserEventListenerTest 发布消息,以便我可以测试UserEventListener 是否使用该消息,以及一个将使用来自@987654329 的消息的测试消费者@以便我可以查看UserEventListener处理记录后是否向some-topic-after-UserEvent发布消息。

KafkaConfigProperties 类如下。

@Component
@ConfigurationProperties(prefix = "application.kafka")
data class KafkaConfigProperties(
    var bootstrap: String = "",
    var schemaRegistry: String = "",
    var deserializer: String = "",
    var userEventTopic: String = "",
)

application.yml 看起来像这样。

application:
  kafka:
    user-event-topic: "platform.user-events.v1"
    bootstrap: "localhost:9092"
    schema-registry: "http://localhost:8081"
    deserializer: com.project.userservice.config.MockAvroDeserializer

错误日志

com.project.userservice.user.UserEventListenerTest > initializationError FAILED
    kafka.common.KafkaException:
        at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:354)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.lambda$createKafkaTopics$4(EmbeddedKafkaBroker.java:341)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.doWithAdmin(EmbeddedKafkaBroker.java:368)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.createKafkaTopics(EmbeddedKafkaBroker.java:340)
        at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:284)
        at org.springframework.kafka.test.rule.EmbeddedKafkaRule.before(EmbeddedKafkaRule.java:114)
        at com.project.userservice.user.UserEventListenerTest.setup(UserEventListenerTest.kt:62)

        Caused by:
        java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.
            at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
            at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
            at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
            at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
            at org.springframework.kafka.test.EmbeddedKafkaBroker.createTopics(EmbeddedKafkaBroker.java:351)
            ... 6 more

            Caused by:
            org.apache.kafka.common.errors.TopicExistsException: Topic 'user-event-topic-UserEventListenerTest' already exists.

我尝试过的:

通过指定引导配置,例如在每个测试中使用不同的引导服务器地址@SpringBootTest(properties = ["application.kafka.bootstrap=localhost:2345"]) 通过@SpringBootTest 覆盖主题配置,在每个测试中使用不同的主题名称,就像上一个要点中的引导服务器覆盖一样 为每个测试类添加@DirtiesContext

软件包版本

Kotlin 1.3.61 Spring Boot - 2.2.3.RELEASE io.projectreactor.kafka:reactor-kafka:1.2.2.RELEASE org.springframework.kafka:spring-kafka-test:2.3.4.RELEASE(仅测试实现)

问题

我有多个使用EmbeddedKafkaRule 的测试类,并且设置或多或少相同。对于它们中的每一个,我指定了不同的 kafka 引导服务器地址和主题名称,但我仍然间歇性地看到 TopicAlreadyExists 异常。

我可以做些什么来使我的测试类保持一致?

【问题讨论】:

【参考方案1】:

我指定了不同的 kafka 引导服务器地址和主题名称,但我仍然间歇性地看到 TopicAlreadyExists 异常

这毫无意义;如果他们每次都有新的端口,尤其是新的主题名称,那么主题不可能已经存在。

一些建议:

    由于你使用的是JUnit5,所以不要使用JUnit4 EmbeddedKafkaRule,改用EmbeddedKafkaBroker;或者简单地添加@EmbeddedKafka,代理将作为bean添加到Spring应用程序上下文及其由Spring管理的生命周期(使用@DirtiesContext销毁);对于非 Spring 测试,代理将由 JUnit5 EmbeddedKafkaCondition 创建(和销毁),并可通过 EmbeddedKafkaCondition.getBroker() 获得。 不要使用显式端口;让代理使用其默认随机端口并使用embeddedKafka.getBrokersAsString() 作为引导服务器属性。 如果您必须自己管理代理(@BeforeAll),请destroy()@AfterAll

【讨论】:

我没有 EmbeddedKafkaRule 用于 JUnit4,我会根据您的建议尝试更新我的测试类。回复:显式端口,我目前通过注入我的 kafka 侦听器类来获取代理地址,即通过KafkaConfigProperties 这是一个ConfigurationProperties,它从application.yml 获取它的值。我尝试将值更新为$spring.embedded.kafka.brokers,但没有成功。我认为原因可能是KafkaConfigPropertiesEmbeddedKafka 设置该值之前被初始化。对此有何建议? 实际上,在我将其更改为使用@EmbeddedKafka 注释后,测试始终如一。虽然有一个问题,我没有指定主题,但主题已正确创建。是因为 kafka 监听器类 - UserEventListener?即如果主题不存在,消费者是否会自动创建主题? allow.auto.create.topics 默认为 true? 对于第一条评论,设置bootstrapServersProperty on the annotation 覆盖yaml中的值。是的,经纪人会auto-create topics, the property is true by default。

以上是关于如何使用 EmbeddedKafkaRule/EmbeddedKafka 设置 Spring Kafka 测试以修复 TopicExistsException 间歇性错误?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用本机反应创建登录以及如何验证会话

如何在自动布局中使用约束标识符以及如何使用标识符更改约束? [迅速]

如何使用 AngularJS 的 ng-model 创建一个数组以及如何使用 jquery 提交?

如何使用laravel保存所有行数据每个行名或相等

如何使用 Math.Net 连接矩阵。如何使用 Math.Net 调用特定的行或列?

WSARecv 如何使用 lpOverlapped?如何手动发出事件信号?