将 kafka 与 jpa 一起使用时的良好做法

Posted

技术标签:

【中文标题】将 kafka 与 jpa 一起使用时的良好做法【英文标题】:Good practice when using kafka with jpa 【发布时间】:2018-12-05 17:41:18 【问题描述】:

我目前在一个使用 JPA 和 Kafka 的项目中。我正在尝试找到一套组合这些操作的良好做法。

在现有代码中,生产者与 jpa 在同一事务中使用,但是,根据我的阅读,他们似乎不共享事务。

@PostMapping
@Transactional
public XDto createX(@RequestBody XRequest request) 
    Xdto dto = xService.create(request);
    kafkaProducer.putToQueue(dto, Type.CREATE);
    return dto;

其中kafka生产者定义如下:

public class KafkaProducer 
    @Autowired
    private KafkaTemplate<String, Type> template;

    public void putToQueue(Dto dto, Type eventType) 
        template.send("event", new Event(dto, eventType));
    

这是组合 jpa 和 kafka 的有效用例吗,是否正确定义了事务边界?

【问题讨论】:

您是否正在尝试实现OLTP系统的变更数据捕获? 【参考方案1】:

当事务失败时,这将无法按预期工作。 kafka 交互不是事务的一部分。

您可能想看看TransactionalEventListener 您可能想在 AFTER_COMMIT 事件上将消息写入 kafka。即使这样,kafka 发布也可能会失败。

另一种选择是使用 jpa 写入 db,就像你正在做的那样。让debezium从你的数据库中读取更新后的数据并推送到kafka。活动将采用不同的形式,但会更加丰富。

【讨论】:

【参考方案2】:

通过查看您的问题,我假设您正在尝试实现 OLTP 系统的 CDC(更改数据捕获),即记录将进入事务数据库的每个更改。有两种方法可以解决这个问题。

    应用程序代码对事务数据库和 Kafka 进行双重写入。它不一致并妨碍性能。不一致,因为当您对两个独立的系统进行双重写入时,如果其中一个写入失败并且在事务流中将数据推送到 Kafka 会增加延迟,您不想在此问题上妥协。 从数据库提交中提取更改(数据库/应用程序级触发器或事务日志)并将其发送到 Kafka。它非常一致,根本不会影响您的交易。一致,因为数据库提交日志是成功提交后数据库事务的反映。有很多可用的解决方案可以利用这种方法,例如 databus、maxwell、debezium 等。

如果 CDC 是您的用例,请尝试使用任何现有的解决方案。

【讨论】:

【参考方案3】:

正如其他人所说,您可以使用更改数据捕获将应用于数据库的更改安全地传播到 Apache Kafka。您不能在单个事务中更新数据库和 Kafka,因为后者不支持任何类型的 2 阶段提交协议。

您可以对表本身进行 CDC,或者,如果您希望对发送给 Kafka 的结构进行更多控制,请应用“发件箱”模式。在这种情况下,您的应用程序将写入其实际的业务表以及包含要发送到 Kafka 的消息的“发件箱”表。您可以在blog post 中找到此方法的详细说明。

免责声明:我是这篇文章的作者,也是 Debezium 的负责人,Debezium 是其他一些答案中提到的 CDC 解决方案之一。

【讨论】:

【参考方案4】:

您不应该将发送到 kafka 的消息放在事务中。如果您需要在无法向 kafka 发送事件,然后恢复事务时的逻辑,在这种情况下使用 spring-retry 会更好。只需将向kafka发送事件相关的代码放在@Retryable注解的方法中,并在@Recover注解的方法中添加恢复之前对DB所做的更改的逻辑。

【讨论】:

以上是关于将 kafka 与 jpa 一起使用时的良好做法的主要内容,如果未能解决你的问题,请参考以下文章

在 Erlang(和 Riak)中开发应用程序时的良好做法?

将 Spring Boot 与 JPA 一起使用时如何持久化

引用对象的良好做法

何时将 EntityManager.find() 与 EntityManager.getReference() 与 JPA 一起使用

将 JPA Spring 引导与 SQL Server 一起使用

JPA 2 / Hibernate 孤儿删除仍然无法与@OneToMany 一起使用?