Apache Kafka 中的事件外包

Posted

技术标签:

【中文标题】Apache Kafka 中的事件外包【英文标题】:Eventsourcing in Apache Kafka 【发布时间】:2017-05-05 19:59:37 【问题描述】:

使用 Kafka 作为事件存储可以正常工作,只需将消息保留设置为无限制即可。

但我也看到了一些关于 Kafka 也被用于事件溯源的报道。 这就是我对这怎么可能感到困惑的地方。 作为一个事件存储,我可以把我的消息推到那里。并根据需要使用或回放。

但对于事件溯源,您很可能希望读取给定实体/聚合 ID 的事件。 您当然可以使用分区,但这似乎是在滥用这个概念,并且实际上很难添加新实体,因为分区计数更多地位于静态方面,即使您可以更改它。 有什么理智的解决方案吗? Apache Kafka 文档本身只简单地提到了事件溯源。

【问题讨论】:

这里有关于使用 Kafka 作为事件存储的讨论 ***.com/questions/17708489/…。他们还有 Kafka Streams,这可能是一个更好的抽象? Martin Kleppmann 有一个good talk,关于在事件驱动架构中使用 Kafka,如果有帮助的话。我的看法是,它们最初是 2 种完全不同的方法,可能需要一些工作和让步才能混合 这里还有更多confluent.io/blog/… 【参考方案1】:

我认为 Apache Kafka 是为 Event Sourcing 存储事件的最佳解决方案。事件溯源的概念非常接近,通常与 Greg Young 的名为 CQRS 的概念/实践一起使用,我建议您研究一下。

我在这个答案中使用的术语 repository 是按照 Eric Evans 书中的领域驱动设计而言的存储库。

我想我知道你的困惑是什么原因。

但对于事件溯源,您很可能希望读取给定实体/聚合 ID 的事件。

我认为您的上述问题是正确的。但是我认为你想表达一些不同的东西。你想表达这样的东西:

在事件溯源中,当要求存储库从其数据源中检索对象时,存储库必须在对存储库的每个请求中检索构成特定实体的所有事件。然后必须重播这些事件以构建对象。

真的是你想要表达的吗?因为上面那句话在我看来是false

您无需在每次检索对象时都重新构建它。

换句话说,您无需在每次从存储库中检索对象时重播构成该对象的所有事件。您可以在对象上播放事件并以不同的方式存储对象的当前版本,例如在缓存中甚至更好,无论是在缓存中还是在 kafka 中。

让我们来举个例子。假设我们有一个装载和卸载的轨道/货车。

事件的主要流将是操作 - 这将是我们应用程序中的第一个 kafka 主题。正如 Jay Kreps 通常在他的论文中所说的那样,这将是我们的真相来源

这些是事件:

轨道 1 上载满了猪 第 2 轨载满了猪 从猪身上卸下轨道 2 轨道 2 装满沙子 从猪身上卸下轨道 1 第 1 轨装满鲜花

最后的结果是track 1是鲜花,track 2是沙子。

您所做的是阅读该主题中的事件并填充您的第二个主题:trackUpdated。您流入 trackUpdated 主题的事件如下:

曲目 1:猪 曲目 2:猪 轨道 2:无 轨道 2:沙子 曲目 1:无 曲目 1:鲜花

同时,随着每条消息的消费,您会更新缓存中卡车的当前版本,例如内存缓存。因此,memcache 将成为存储库用来检索跟踪对象的直接来源。

更重要的是,您将 trackUpdated 主题设为压缩 主题。

阅读 Apache Kafka 官方文档中的压缩主题。在 Confluent 博客和 Linkedin Engineering 博客(在 Confluent 公司成立之前)有很多有趣的资料。

因此,由于 trackUpdated 已与 Kafka 兼容,因此在一段时间后它看起来像这样:

轨道 2:沙子 曲目 1:鲜花

如果您使用轨道 ID 作为所有消息的键,Kafka 将执行此操作 - 请在文档中阅读消息“键”是什么。因此,您最终会为每个曲目收到 1 条消息。如果您在应用中发现错误,您可以重播 operations 主题以再次填充您的缓存和 trackUpdated 主题。如果您的缓存出现故障,您可以使用 trackUpdated 主题来填充您的缓存。

你怎么看?投票和 cmets 非常受欢迎。

更新:

(1) 经过一番思考,我改变了主意,你的一句话是真的。我现在觉得是假的。所以我不**((认为对于事件溯源,您很可能希望读取给定实体/聚合 ID 的事件

当您在代码中发现错误时,您希望重播所有对象的所有事件。无论是像我的简单示例中那样有 2 个实体还是有 1000 万个实体都没有关系。

事件溯源不是检索特定实体的所有事件。事件溯源是指您拥有所有事件的审核日志,并且您能够重放它们以重建您的实体。您不需要能够重建单个特定实体

(2) 强烈建议熟悉 Confluent 和 LinkedIn 工程博客的一些博文。以下对我来说非常有趣:

https://www.confluent.io/blog/making-sense-of-stream-processing/

https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying

官方的 Kafka 文档也是必须的。

【讨论】:

感谢您的努力,但答案与主题无关。 “真的是你想要表达的吗?”不,问题与 DDD 或 CQRS 无关。我对这些很熟悉。我在问如何或是否可以使用 Kafka 进行事件搜索。假设我有 1000 万个实体,我可能不希望一次将它们全部加载到服务器的内存中。我可以使用 Kafka 加载单个聚合的数据而不重播所有内容吗? @RogerAlsing Kafka 在事件溯源方面为您提供的是能够在您每次需要时非常轻松地重播所有事件。要回答您的具体问题:不,您不能为单个聚合加载数据而无需重播所有内容。我的回答是试图说服你,问问自己你是否真的想要/需要这样做。我认为你没有。您也不需要将它们全部加载到内存中。可能是一些 SQL、elastic、mongo 等。压缩主题在这里也有很大帮助。我已经更新了我的答案,请检查结尾!抱歉跑题了! :) 干杯! @miglanc 您建议始终为外部数据存储和压缩 Kafka 主题中的每个实体以及另一个 Kafka 主题中的原始事件维护当前状态快照?我的理解正确吗? @RogerAlsing 不,由于主题限制,我不这么认为。 Kafka 版本的事件溯源似乎是针对每个类别的主题,给定类型的所有实体的所有事件都会发生。 @tomliversidge (1) 是的,我建议将所有原始事件保存在某个地方。必须的。这就是 ES 的全部内容:审计日志 + 重播能力。它不一定是 Kafka,但我认为它是最好的数据存储。 (2) 是的,我建议在至少一个最适合您的用例的数据存储中维护每个实体的当前状态快照。可能是 Kafka + memcached。它可能只是 SQL 或 ElasticSearch。这取决于您的用例。关键不是每次要检索实体时都计算所有事件。请参阅 Martin Fowler 视频“事件溯源”。你呢?【参考方案2】:

关于您对另一个问题的评论:

感谢您在这里的努力,但答案与主题无关。 “真的是你想要表达的吗?”不,问题与 DDD 或 CQRS 无关。我对这些很熟悉。我在问如何或是否可以使用 Kafka 进行事件追踪。假设我有 1000 万个实体,我可能不希望一次将它们全部加载到服务器的内存中。我可以使用 Kafka 加载单个聚合的数据而不重播所有内容吗?

答案是肯定的:您可以使用Kafka Streams 来处理事件。您的流逻辑生成聚合并将它们存储在本地状态存储 (RocksDB) 中,因此生成的聚合不需要在内存中,并且可以在不重播所有事件的情况下访问。您可以使用Interactive Queries API 访问这些聚合。这是相当不错的!此时,编写可重放的事件处理逻辑是easier said than done,但无论如何也不是不可能的。

【讨论】:

以上是关于Apache Kafka 中的事件外包的主要内容,如果未能解决你的问题,请参考以下文章

「企业事件枢纽」Apache Kafka支持ACID事务吗?

Apache Beam 管道中的连续状态

angular.js 与 apache kafka 的集成

「首席看Kafka」Apache Kafka中的事务

Kafka---将kafka中的数据导入HBase

如何摆脱 NoSuchMethodError:Spark Streaming + Kafka 中的 org.apache.kafka.clients.consumer.KafkaConsumer.su