Spring Kafka/Spring Cloud Stream 如何保证涉及数据库和 Kafka 的事务性/原子性?

Posted

技术标签:

【中文标题】Spring Kafka/Spring Cloud Stream 如何保证涉及数据库和 Kafka 的事务性/原子性?【英文标题】:How does Spring Kafka/Spring Cloud Stream guarantee the transactionality / atomicity involving a Database and Kafka? 【发布时间】:2019-09-16 00:31:41 【问题描述】:

Spring Kafka,因此Spring Cloud Stream,允许我们创建事务生产者和处理器。我们可以在其中一个示例项目中看到该功能的实际应用:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:

@Transactional
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public PersonEvent process(PersonEvent data) 
        logger.info("Received event=", data);
        Person person = new Person();
        person.setName(data.getName());

        if(shouldFail.get()) 
            shouldFail.set(false);
            throw new RuntimeException("Simulated network error");
         else 
            //We fail every other request as a test
            shouldFail.set(true);
        
        logger.info("Saving person=", person);

        Person savedPerson = repository.save(person);

        PersonEvent event = new PersonEvent();
        event.setName(savedPerson.getName());
        event.setType("PersonSaved");
        logger.info("Sent event=", event);
        return event;
    

在这段摘录中,有一个从 Kafka 主题读取,在数据库中写入和另一个对另一个 Kafka 主题的写入,所有这些都是事务性的。

我想知道并想回答的是,这在技术上是如何实现和实施的。

由于数据源和 Kafka 不参与 XA 事务(2 阶段提交),那么实现如何保证本地事务可以从 Kafka 读取、提交到数据库并以事务方式将所有这些写入 Kafka?

【问题讨论】:

【参考方案1】:

没有保证,只有在 Kafka 内部。

Spring 提供了事务同步,所以提交很接近,但是数据库可以提交而 Kafka 没有。所以你必须处理重复的可能性。

当直接使用 spring-kafka 时,正确的方法不是使用@Transactional,而是在侦听器容器中使用ChainedKafkaTransactionManager

见Transaction Synchronization。

另请参阅Distributed transactions in Spring, with and without XA 和“Best Efforts 1PC 模式”作为背景。

但是,对于 Stream,不支持链式事务管理器,因此需要 @Transactional(使用 DB 事务管理器)。这将为链式 tx 管理器提供类似的结果,数据库首先提交,就在 Kafka 之前。

【讨论】:

现在清楚多了!只有一件事,当你说:“所以你必须处理重复的可能性。”你的意思是处理器会再次处理输入消息,并且会有另一个数据库操作? 是的,默认情况下,当kafka事务回滚时,我们会重新寻找分区,这样记录会被重新传递。如果数据库已提交,那么您可能会得到重复的数据库记录,除非您有一些重复数据删除逻辑。 嗨@GaryRussell,所以如果我使用一个监听器(消费者)来处理具有两个操作(BD和kafka)的事务,那么@Transactional(在spring cloud stream)就是我所需要的。可以吗?我很困惑尝试在我的 Spring 云流项目中添加 ChainedTransactionManager。 这个答案是 18 个月大的;您现在可以将ChainedKafkaTransactionManager 注入到侦听器容器中。有关自定义程序示例,请参阅 this answer。使用container.getContainerProperties().setTransactionManager(new ChainedKafkaTransactionManager(container.getContainerProperties().getTransactionManager(), dbtm)。您必须从酒店获取原始 KTM 并将其添加到 ChainedKafkaTransactionManager

以上是关于Spring Kafka/Spring Cloud Stream 如何保证涉及数据库和 Kafka 的事务性/原子性?的主要内容,如果未能解决你的问题,请参考以下文章

如何在默认情况下从 Kafka Spring Cloud Stream 消费并消费由 Confluent API 生成的 Kafka 消息?

kafka:spring集成 kafka(springboot集成客户端集成)

spring boot 配置使用kafka

spring集成kafka

Spring Integration 与 Kafka 自动配置问题

Spring Boot Kafka监听器不一致