用于 Spring Cloud 的 Kafka 绑定器不使用事务回滚
Posted
技术标签:
【中文标题】用于 Spring Cloud 的 Kafka 绑定器不使用事务回滚【英文标题】:Kafka Binders for Spring Cloud not rolling back with Transactional 【发布时间】:2021-09-19 04:00:08 【问题描述】:我正在尝试通过 Spring Cloud 在事务中向 Kafka 中的两个单独主题发送消息。在第一条和第二条消息之间抛出异常时,第一条消息仍然出现在第一个主题的消费者中,表明消息没有被回滚。这是我的代码:
@Configuration
@EnableTransactionManagement
public class KafkaChannelTester implements CommandLineRunner
ChannelHolder channelHolder;
MessageChannel messageChannel1;
MessageChannel messageChannel2;
public KafkaChannelTester(ChannelHolder channelHolder)
this.channelHolder = channelHolder;
this.messageChannel1 = channelHolder.messageChannel1();
this.messageChannel2 = channelHolder.messageChannel2();
@Override
public void run(String... args) throws Exception
transactionFail();
public void throwException() throw new RuntimeException();
@Transactional
public void transactionFail()
Message<String> message1 = MessageBuilder
.withPayload("Test-transaction-fail-"+ LocalDateTime.now())
.build();
Message<String> message2 = MessageBuilder
.withPayload("Test-transaction-fail-"+ LocalDateTime.now())
.build();
messageChannel1.send(message1);
throwException();
messageChannel2.send(message2);
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders)
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
System.out.println(pf.transactionCapable());
System.out.println(pf.getTransactionIdPrefix());
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
return tm;
application.yml 包含以下内容:
spring:
cloud:
stream:
bindings:
cloud-producer-1:
destination:
peter.cloud.test.1
cloud-producer-2:
destination:
peter.cloud.test.2
kafka:
binder:
brokers:
- testkbroker:9092
transaction:
transaction-id-prefix: transaction-1-
producer:
configuration:
enable.idempotence: true
retries: 1
acks: all
transactionManager 中的打印语句确认生产者工厂确实具有事务 id 前缀,并且具有事务能力。我该怎么做才能使交易正常进行?
【问题讨论】:
【参考方案1】:记录总是被写入日志,即使它们被回滚。默认情况下,消费者会看到回滚记录,您必须将消费者属性isolation.level
设置为read_committed
以避免获得回滚记录。
https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
【讨论】:
即使隔离级别设置为 read_committed,消息仍然显示。 你从哪里打电话给transactionFail()
?如果您从CommandLineRunner.run()
方法调用,您将绕过事务拦截器。您需要将该方法移动到不同的 bean,以便可以拦截该方法以启动事务。或者,使用TransactionTemplate
手动启动事务。当消息到达KafkaProducerMessageHandler
时,如果不存在现有事务,则处理程序会分别发送一个已提交的本地事务。
我是从 CommandLineRunner.run() 调用它的。当我把它拿出来并在另一个类中运行 CommandLineRunner.run() 时,它开始作为事务工作。感谢您的帮助!【参考方案2】:
从不同的类运行事务方法。如果“run”方法和“Transactional”注解方法在同一个类中,CommandLineRunner.run() 会绕过事务拦截器,从而阻止事务方法作为事务运行。
【讨论】:
以上是关于用于 Spring Cloud 的 Kafka 绑定器不使用事务回滚的主要内容,如果未能解决你的问题,请参考以下文章
当没有运行 kafka 代理时,如何禁用 Spring Cloud Stream 以用于开发目的?
Spring Cloud Stream GCP如何重新排队失败的消息
spring.cloud.stream.kafka.bindings.<channelName>.producer.configuration 未应用