Producer#initTransactions 不适用于 KafkaContainer
Posted
技术标签:
【中文标题】Producer#initTransactions 不适用于 KafkaContainer【英文标题】:Producer#initTransactions doesn't work with KafkaContainer 【发布时间】:2019-08-11 03:38:12 【问题描述】:我尝试通过事务向 Kafka 发送消息。所以,我使用这个代码:
try (Producer<Void, String> producer = createProducer(kafkaContainerBootstrapServers))
producer.initTransactions();
producer.beginTransaction();
Arrays.stream(messages).forEach(
message -> producer.send(new ProducerRecord<>(KAFKA_INPUT_TOPIC, message)));
producer.commitTransaction();
...
private static Producer<Void, String> createProducer(String kafkaContainerBootstrapServers)
return new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainerBootstrapServers,
ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString()
),
new VoidSerializer(),
new StringSerializer());
如果我使用本地 Kafka,效果很好。
但如果我使用 Kafka TestContainers,它会在 producer.initTransactions()
冻结:
private static final String KAFKA_VERSION = "4.1.1";
@Rule
public KafkaContainer kafka = new KafkaContainer(KAFKA_VERSION)
.withEmbeddedZookeeper();
如何配置 KafkaContainer 以处理事务?
【问题讨论】:
您应该查看 createProducer()。我相信它没有设置 enable.idempotence。示例配置显示在baeldung.com/kafka-exactly-once 我设置了“ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true”。 还有几件事要检查: 1. 检查 Kafka 的版本。 2.可以配置Kafka,防止生产者创建Topic。在这种情况下,您可能需要联系管理员为您创建主题。 本地我有 2.1.1 Kafka 版本。但是对于 TestContainerts,即使使用相同的版本也无法正常工作,5.1.1 for Confluent Platform - 2.1.1 for Kafka (docs.confluent.io/current/installation/…)。 【参考方案1】:尝试使用Kafka for JUnit 代替 Kafka 测试容器。我在事务方面遇到了同样的问题,并以这种方式使它们活跃起来。
我使用的 Maven 依赖项:
<dependency>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
【讨论】:
【参考方案2】:按照@AntonLitvinenko 的建议,我使用Kafka for JUnit 遇到了异常。我的问题here。
我添加了这个依赖来修复它(参见issue):
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.12.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
另外,我为 kafka-junit 和 kafka_2.11 使用了2.0.1
版本:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>$kafkaVersion</version>
<scope>test</scope>
</dependency>
【讨论】:
以上是关于Producer#initTransactions 不适用于 KafkaContainer的主要内容,如果未能解决你的问题,请参考以下文章