用于 Spring Cloud 的 Kafka 绑定器不使用事务回滚

Posted

技术标签:

【中文标题】用于 Spring Cloud 的 Kafka 绑定器不使用事务回滚【英文标题】:Kafka Binders for Spring Cloud not rolling back with Transactional 【发布时间】:2021-09-19 04:00:08 【问题描述】:

我正在尝试通过 Spring Cloud 在事务中向 Kafka 中的两个单独主题发送消息。在第一条和第二条消息之间抛出异常时,第一条消息仍然出现在第一个主题的消费者中,表明消息没有被回滚。这是我的代码:

@Configuration
@EnableTransactionManagement
public class KafkaChannelTester implements CommandLineRunner 

    ChannelHolder channelHolder;
    MessageChannel messageChannel1;
    MessageChannel messageChannel2;

    public KafkaChannelTester(ChannelHolder channelHolder) 
        this.channelHolder = channelHolder;
        this.messageChannel1 = channelHolder.messageChannel1();
        this.messageChannel2 = channelHolder.messageChannel2();
    

    @Override
    public void run(String... args) throws Exception 
        transactionFail();
    


    public void throwException() throw new RuntimeException();

    @Transactional
    public void transactionFail()
        Message<String> message1 = MessageBuilder
            .withPayload("Test-transaction-fail-"+ LocalDateTime.now())
            .build();
        Message<String> message2 = MessageBuilder
            .withPayload("Test-transaction-fail-"+ LocalDateTime.now())
            .build();
        messageChannel1.send(message1);
        throwException();
        messageChannel2.send(message2);
    
    @Bean
    public PlatformTransactionManager transactionManager(BinderFactory binders) 
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
        System.out.println(pf.transactionCapable());
        System.out.println(pf.getTransactionIdPrefix());
        KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
        return tm;
    

application.yml 包含以下内容:

spring:
  cloud:
    stream:
      bindings:
        cloud-producer-1:
          destination:
            peter.cloud.test.1
        cloud-producer-2:
          destination:
            peter.cloud.test.2
      kafka:
        binder:
          brokers:
            - testkbroker:9092
          transaction:
            transaction-id-prefix: transaction-1-
            producer:
              configuration:
                enable.idempotence: true
                retries: 1
                acks: all

transactionManager 中的打印语句确认生产者工厂确实具有事务 id 前缀,并且具有事务能力。我该怎么做才能使交易正常进行?

【问题讨论】:

【参考方案1】:

记录总是被写入日志,即使它们被回滚。默认情况下,消费者会看到回滚记录,您必须将消费者属性isolation.level 设置为read_committed 以避免获得回滚记录。

https://kafka.apache.org/documentation/#consumerconfigs_isolation.level

【讨论】:

即使隔离级别设置为 read_committed,消息仍然显示。 你从哪里打电话给transactionFail()?如果您从CommandLineRunner.run() 方法调用,您将绕过事务拦截器。您需要将该方法移动到不同的 bean,以便可以拦截该方法以启动事务。或者,使用TransactionTemplate 手动启动事务。当消息到达KafkaProducerMessageHandler 时,如果不存在现有事务,则处理程序会分别发送一个已提交的本地事务。 我是从 CommandLineRunner.run() 调用它的。当我把它拿出来并在另一个类中运行 CommandLineRunner.run() 时,它开始作为事务工作。感谢您的帮助!【参考方案2】:

从不同的类运行事务方法。如果“run”方法和“Transactional”注解方法在同一个类中,CommandLineRunner.run() 会绕过事务拦截器,从而阻止事务方法作为事务运行。

【讨论】:

以上是关于用于 Spring Cloud 的 Kafka 绑定器不使用事务回滚的主要内容,如果未能解决你的问题,请参考以下文章

当没有运行 kafka 代理时,如何禁用 Spring Cloud Stream 以用于开发目的?

Spring Cloud Stream GCP如何重新排队失败的消息

spring.cloud.stream.kafka.bindings.<channelName>.producer.configuration 未应用

无法解析我的Json对象,该对象在Spring Cloud流中通过绑定器接收

Spring Cloud入门 - 微服务与消息驱动

Spring Cloud Azure 参考文档