Producer#initTransactions 不适用于 KafkaContainer

Posted

技术标签:

【中文标题】Producer#initTransactions 不适用于 KafkaContainer【英文标题】:Producer#initTransactions doesn't work with KafkaContainer 【发布时间】:2019-08-11 03:38:12 【问题描述】:

我尝试通过事务向 Kafka 发送消息。所以,我使用这个代码:

 try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers)) 
            producer.initTransactions();
            producer.beginTransaction();
            Arrays.stream(messages).forEach(
                message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
            producer.commitTransaction();
        

...

private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers) 
        return new KafkaProducer<>(
            ImmutableMap.of(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
                ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
                ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
                ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
            ),
            new VoidSerializer(),
            new StringSerializer());
    

如果我使用本地 Kafka,效果很好。

但如果我使用 Kafka TestContainers,它会在 producer.initTransactions() 冻结:

private static final String KAFKA_VERSION = "4.1.1";

@Rule
public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
    .withEmbeddedZookeeper();

如何配置 KafkaContainer 以处理事务?

【问题讨论】:

您应该查看 createProducer()。我相信它没有设置 enable.idempotence。示例配置显示在baeldung.com/kafka-exactly-once 我设置了“ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true”。 还有几件事要检查: 1. 检查 Kafka 的版本。 2.可以配置Kafka,防止生产者创建Topic。在这种情况下,您可能需要联系管理员为您创建主题。 本地我有 2.1.1 Kafka 版本。但是对于 TestContainerts,即使使用相同的版本也无法正常工作,5.1.1 for Confluent Platform - 2.1.1 for Kafka (docs.confluent.io/current/installation/…)。 【参考方案1】:

尝试使用Kafka for JUnit 代替 Kafka 测试容器。我在事务方面遇到了同样的问题,并以这种方式使它们活跃起来。

我使用的 Maven 依赖项:

<dependency>
    <groupId>net.mguenther.kafka</groupId>
    <artifactId>kafka-junit</artifactId>
    <version>2.1.0</version>
    <scope>test</scope>
</dependency>

【讨论】:

【参考方案2】:

按照@AntonLitvinenko 的建议,我使用Kafka for JUnit 遇到了异常。我的问题here。

我添加了这个依赖来修复它(参见issue):

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.12.0</version>
    <exclusions>
        <exclusion>
           <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
    <scope>test</scope>
</dependency>

另外,我为 kafka-junit 和 kafka_2.11 使用了2.0.1 版本:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>$kafkaVersion</version>
    <scope>test</scope>
</dependency>

【讨论】:

以上是关于Producer#initTransactions 不适用于 KafkaContainer的主要内容,如果未能解决你的问题,请参考以下文章

Producer机制

Kafka producer原理 (Scala版同步producer)

Kafka系列之-自定义Producer

Apache pulsar producer接口详解

kafka producer consumer

kafka 0.8.2 消息生产者 producer