如何在事件源应用程序中管理读取请求

Posted

技术标签:

【中文标题】如何在事件源应用程序中管理读取请求【英文标题】:How to manage read requests in an event sourced application 【发布时间】:2018-02-12 09:21:37 【问题描述】:

我被要求对事件溯源进行一些探索。我的目标是创建一个满足所有传统 CRUD 操作的微型 API 层。我现在正在使用一个名为“sourced”的包并尝试使用它(使用 Nodejs)。

但是,我开始意识到事件溯源在单独使用时并不是很有用。通常,它与 CQRS 相结合。

我对 CQRS 的理解是,当 UI 向服务器发送写入命令时。该应用程序对数据进行了一些验证。并将其保存在事件存储中(我使用的是 mongoDB),例如:这是我的事件存储的样子:

method:"createAccount",name:"user1", account:1
method:"deposit",name:"user1",account: 1 , amount:100
method:"deposit",name:"user1",account: 1 , amount:100
method:"deposit",name:"user1",account: 1 , amount:100
method:"withdraw",name:"user1",account1,amount:250

它包含所有审计信息,而不是最终状态。 但是,我很困惑如何处理读取操作。如果我想读取账户余额怎么办。究竟会发生什么? 这是我的问题:

    如果我们不能直接查询事件存储(数据库)进行读取操作,那么我们应该在哪里查询呢?它应该是内存中的缓存吗? 如果我们查询内存。是已经存在的最终状态还是我必须进行重放(或左折叠)操作来计算结果。例如,账户 1 的余额为 50。 我发现一些博主谈到了“订阅”或“广播”。它们是什么,向谁广播?

我将非常感谢任何建议,如果我的理解有误,请纠正我。

【问题讨论】:

对我来说,您在问题中没有意识到的一个概念是事件通常在流中维护,在 DDD 意义上,这些流是按聚合隔离的。在决策过程中考虑的事件序列仅限于您在编写时并置的事件。在查询方面,您可以在某些低负载情况下通过阅读+折叠整个流来破解它。一般来说,正如答案所暗示的,预测是如何实现查询的等价物。 intro docs for GES 很好读 @RubenBartelink 是的,你是对的!我不知道如何使用事件流。我应该研究一下流维护的事情。感谢您的建议! 【参考方案1】:

很好的问题尼克。您缺少的概念是“投影”。当事件持续存在时,您将广播该事件。您的投影代码侦听特定事件,然后执行更新和创建“读取模型”等操作。读取模型是最终状态的一个版本(通常是持久化的,但可以在内存中完成)。

好处是您可以高度优化这些阅读模型以进行阅读。告别复杂和低效的连接等。

因为读取模型不是事实的来源,并且它是专门为读取而设计的,所以可以在其中包含数据重复。只要确保在收到适当的事件时管理它。

有关更多信息,请查看以下文章:

Overview of a Typical CQRS and ES Application** How to Build a Master Details View when using CQRS and Event Sourcing Handling Concurrency Conflicts in a CQRS and Event Sourced system

希望你觉得这些有用。

** 该图指的是非规范化,它应该谈论投影。

【讨论】:

你的回答消除了我的困惑!多谢!我现在正在实施一个小型银行存款/取款申请。事件发生后。我广播这个事件(例如'deposit'),监听'deposit'的对应部分会处理这个deposit事件来更新余额。这是正确的吗? 顺便说一下,假设我在数据库中有数百万条记录。这是否意味着如果我重新启动服务器。初始化要更新的 ReadModel 可能需要一段时间? 几乎是您的第一条评论。改变命名命令和事件的方式。命令是命令式的,事件总是过去时。关于重新初始化。重新启动应用程序应该不会比任何其他应用程序花费更长的时间。您应该持久化读取模型。希望这是有道理的。【参考方案2】:

您可以查询事件存储。查询的实际方法特定于每个实现,但通常您可以轮询事件或订阅并在新事件持续存在时收到通知。

事件存储只是写入端的持久性,它保证写入操作的强一致性和读取操作的最终一致性。为了从事件中“理解”某些内容,您需要将这些事件投影到读取模型,然后查询读取模型。例如,您可以拥有一个读取模型,其中包含每个帐户的当前余额作为 MongoDB 集合。

【讨论】:

谢谢!您的回答帮助我理解了它!我现在正在实现向监听器广播,然后监听器可以处理相应的方法(例如,'withdraw')

以上是关于如何在事件源应用程序中管理读取请求的主要内容,如果未能解决你的问题,请参考以下文章

如何在beantalk worker中批量读取sqs消息

在CDK中通过ARN将事件源添加到Lambda

如何在 COM 事件源和处理程序中指定“事件类型”?

如何使用 java 解析水槽事件(Twitter 源)

libev学习

同步关注点的事件溯源