使用 kafka 和 cassandra 进行事件溯源的类别预测

Posted

技术标签:

【中文标题】使用 kafka 和 cassandra 进行事件溯源的类别预测【英文标题】:Category projections using kafka and cassandra for event-sourcing 【发布时间】:2019-12-13 16:02:43 【问题描述】:

我正在使用 Cassandra 和 Kafka 进行事件溯源,而且效果很好。但我最近刚刚发现了设计/设置中的一个潜在重大缺陷。简要介绍它是如何完成的:

    聚合命令处理程序基本上是一个 kafka 消费者,它消费关于某个主题的感兴趣的消息:

    1.1 当它接收到命令时,它会加载聚合的所有事件,并为每个事件重放聚合事件处理程序以使聚合达到当前状态。

    1.2 根据命令和业务逻辑,它会将一个或多个事件应用于事件存储。这涉及将新事件插入到 cassandra 中的事件存储表中。事件标记有聚合的版本号 - 从版本 0 开始用于新聚合,使预测成为可能。此外,它将事件发送到另一个主题(用于投影目的)。

    1.3 kafka 消费者将在这些事件发布后监听主题。该消费者将充当投影仪。当它接收到感兴趣的事件时,它会加载聚合的当前读取模型。它检查它收到的事件的版本是否是预期的版本,然后更新读取模型。

这似乎工作得很好。问题是当我想要 EventStore 所谓的类别投影时。我们以 Order 聚合为例。我可以轻松地投射一个或多个阅读模型 pr Order。但是,如果我想有一个包含客户 30 个最后订单的投影,那么我需要一个类别投影。

我只是在摸索如何做到这一点。我很想知道是否有其他人正在使用 Cassandra 和 Kafka 进行事件采购。我读过一些人们不鼓励它的地方。也许这就是原因。

我知道 EventStore 已内置支持此功能。也许使用 Kafka 作为事件存储会是更好的解决方案。

【问题讨论】:

您的活动主题有什么粒度?每种聚合类型有一个主题还是每个聚合实例有一个主题?鉴于 Kafka 无法扩展到数百万个主题,前者是正常的方法,这意味着您已经准备好您的类别。 一个主题公关聚合类型。但是3个分区。以及应用程序的 2 个实例(意味着同一消费者组中的两个消费者)。但现在我一直在考虑制作“全局”事件版本 pr 聚合类型的解决方案。如果我将聚合事件发送到只有一个分区的主题(主题 pr 聚合),那么我可以使用它并用全局版本标记事件,然后将版本化事件输出到另一个主题。然后我正在考虑为这个主题设置一个消费者组 pr 投影,并将投影的位置存储在数据库中。但这将失败 3 个分区 我现在能看到的唯一方法是在投影消费者收听的主题上只有一个分区。不确定这是否是最佳做法 【参考方案1】:

对于这种架构,您必须在以下之间做出选择:

每种类型的全局事件流 - 简单 按类型划分事件流 - 可扩展

除非您的系统具有相当高的吞吐量(例如,对于所讨论的流类型,持续时间每秒至少 10 秒或 100 秒的事件),否则全局流是更简单的方法。一些系统(例如事件存储)通过非常细粒度的流(例如每个聚合实例)为您提供两全其美的流,但能够将它们组合成更大的流(每个流类型/类别/分区,每个多种流类型等)以开箱即用的高性能和可预测方式,同时仍然很简单,只需要您跟踪单个全局事件位置。

如果你使用 Kafka 进行分区:

在处理需要进入相同模型的不同分区的事件时,您的投影代码将需要处理访问相同读取模型的并发消费者组。根据您的投影目标商店,有很多方法可以处理此问题(事务、乐观并发、原子操作等),但对于某些目标商店来说,这将是一个问题 您的投影代码需要跟踪每个分区的流位置,而不仅仅是单个位置。如果您的投影从多个流中读取,它必须跟踪很多位置。

使用全局流消除了这两个问题 - 性能通常可能足够好。

在任何一种情况下,您可能还希望将流位置放入长期事件存储(即 Cassandra) - 您可以通过从事件流(分区或全局)读取专用进程来做到这一点,并且只需使用每个事件的全局或分区位置更新 Cassandra 中的事件。 (我对 MongoDB 也有类似的事情——我有一个读取“oplog”并将 oplog 时间戳复制到事件中的过程,因为 oplog 时间戳是完全有序的)。

另一种选择是从初始命令处理中删除 Cassandra 并改用 Kafka Streams:

通过与聚合的分区 KTable 连接来处理分区命令流 计算命令结果和事件 以原子方式,KTable 使用更改的聚合更新,事件写入事件流,命令响应写入命令响应流。

然后,您将拥有一个下游事件处理器,它将事件复制到 Cassandra 中以便于查询等(并且它可以将 Kafka 流位置添加到每个事件,因为它可以给类别排序)。如果您不想使用 Kafka 进行长期事件存储,这有助于赶上订阅等。 (为了赶上进度,您只需从 Cassandra 尽可能多地阅读,然后从上一个 Cassandra 事件的位置切换到从 Kafka 流式传输)。另一方面,Kafka 本身可以永久存储事件,所以这并不总是必要的。

我希望这有助于了解您可能遇到的权衡和问题。

【讨论】:

以上是关于使用 kafka 和 cassandra 进行事件溯源的类别预测的主要内容,如果未能解决你的问题,请参考以下文章

是否有开源 Kafka Cassandra 连接器配置的示例示例?

Storm 中的延迟队列实现——Kafka、Cassandra、Redis 还是 Beanstalk?

Kafka Cassandra 连接器实际上并未写入数据库

Kafka使用logstash流到cassandra

使用Kafka+Spark+Cassandra构建实时处理引擎

如何在从 Spark 消费 Kafka 时获取偏移 id,将其保存在 Cassandra 中并使用它来重新启动 Kafka?