与 DDD + CQRS + ES 的并发

Posted

技术标签:

【中文标题】与 DDD + CQRS + ES 的并发【英文标题】:Concurrency with DDD + CRQS + ES 【发布时间】:2018-09-05 02:50:21 【问题描述】:

我研究 DDD 已经有一段时间了,偶然发现了 CQRS 和事件溯源 (ES) 等设计模式。这些模式可用于帮助以更少的努力实现 DDD 的某些概念。

然后我开始开发一个简单的软件来实现所有这些概念。并开始想象可能的失败路径。

为了澄清我的架构,下图描述了一个来自前端并到达控制器的请求,我是后端(为简单起见,我忽略了所有过滤器、绑定器)。

    演员发送一份表格,其中包含他想从一个帐户中提取的金额。 控制器将视图模型传递到应用层,在应用层中将其转换为一个命令 应用层打开一个工作单元 (UOW) 将 VM 映射到命令并将命令发送到调度程序。 调度程序找到知道如何处理命令(帐户)的相应聚合类,并向工厂请求帐户的特定实例。 工厂创建一个新的帐户实例并从事件存储中请求所有事件。 事件存储返回帐户的所有事件。 工厂将所有事件发送到聚合,以便其内部状态正确。并返回帐户的实例。 调度程序将命令发送到帐户,以便对其进行处理。 账户检查是否有足够的钱进行提款。如果有,它会发送一个新事件“MoneyWithdrawnEvent”。 此事件由更改其内部状态的聚合(帐户)处理。 应用程序层关闭 UOW,当它关闭时,UOW 检查所有加载的聚合以检查它们是否有新事件要保存到事件存储中。如果有,它将事件发送到存储库。 存储库将事件保存到事件存储中。

可以添加很多层,例如:聚合的缓存、事件的缓存、快照等。

有时 ES 可以与关系数据库并行使用。因此,当 UOW 保存已发生的新事件时,它还将聚合保存到关系数据库中。

ES 的一个好处是它有一个中心数据源,即事件存储。因此,即使内存甚至关系数据库中的模型损坏,我们也可以从事件中重建模型。

有了这个事实来源,我们可以构建其他系统,以不同的方式使用事件来形成不同的模型。

但是,要实现这一点,我们需要真理的来源是干净的,没有被破坏的。否则所有这些好处都不会存在。

也就是说,如果我们在图中描述的架构中考虑并发性,可能会出现一些问题:

如果actor在一个排序周期内向后端发送了两次表单,后端启动了两个线程(每个请求一个),那么他们会调用两次应用层,并启动两个UOW,以此类推.这可能会导致两个事件存储在事件存储中。

这个问题可以在很多不同的地方处理:

    前端可以控制哪些用户/参与者可以执行什么操作以及执行多少次。

    Dispatcher 可以拥有所有正在处理的命令的缓存,如果存在引用同一聚合(帐户)的命令,则会引发异常。

    存储库可以创建一个新的聚合实例,并在保存之前运行事件存储中的所有事件,以检查版本是否仍与第 7 步中获取的版本相同。

每个解决方案的问题:

    前端

    用户可以通过编辑一些 javascript 来绕过这个限制。 如果打开了多个会话(例如,不同的浏览器),则需要一些静态字段来保存对所有打开的会话的引用。并且有必要锁定一些静态变量才能访问该字段。 如果有多个服务器用于执行特定操作(水平缩放),则此静态字段将不起作用,因为有必要在所有服务器之间共享此字段。因此,需要一些层(例如 Redis)。

    命令缓存

    要使此解决方案起作用,有必要在读取和写入命令缓存时锁定命令缓存的某些静态变量。

    如果有多个服务器用于执行应用层的特定用例(水平扩展),则此静态缓存将不起作用,因为有必要在所有服务器之间共享此缓存。因此,需要一些层(例如 Redis)。

    存储库版本检查

    要使该解决方案有效,有必要在检查(数据库版本等于步骤 7 中获取的版本)并保存之前锁定一些静态变量。

    如果系统是分布式的(水平比例),则有必要锁定事件存储。因为,否则,两个进程都可以通过检查(数据库的版本等于步骤 7 中获取的版本),然后一个保存,然后另一个保存。并且根据技术,不可能锁定事件存储。因此,将有另一个层来序列化对事件存储的每次访问并添加锁定存储的可能性。

这种锁定静态变量的解决方案有点好,因为它们是局部变量并且非常快。但是,依赖于 Redis 之类的东西会增加一些较大的延迟。如果我们谈论锁定对数据库的访问(事件存储),甚至更多。甚至更多,如果这必须通过其他服务来完成。

我想知道是否有任何其他可能的解决方案来处理这个问题,因为这是一个主要问题(事件存储损坏),如果没有办法解决它,整个概念似乎是有缺陷的。

我对架构中的任何更改持开放态度。例如,如果一种解决方案是添加一个事件总线,以便所有内容都通过它汇集,那很好,但我看不出这能解决问题。

我不熟悉的另一点是卡夫卡。不知道 Kafka 有没有针对这个问题提供的解决方案。

【问题讨论】:

无关:您可能需要更仔细地考虑您的界限在哪里。您的domain model 应该知道如何获取事件历史记录和withdraw command 并创建新事件。那是你的功能核心——所有的管道都应该在它之外。 您可能还需要考虑是否需要步骤 1 + 2。为什么不直接创建命令而不是视图模型并派生命令。这将减少您为获得相同的可测试结果而需要编写的代码的数量和复杂性。 嗨@VoiceOfUnreason,在图片中描述的架构中,Aggregate 是处理命令和事件(以及事件历史)的类。并且聚合在域层中。 嗨@Codescribler,我可以删除应用程序层,以便控制器负责将视图模型映射到命令中。但是,我更喜欢包含它,因此可以更轻松地测试代码。控制器变得更轻(更少的代码)。 我的意思是,为什么要映射任何东西?这是额外的复杂性,没有真正的收获。 【参考方案1】:

尽管您提供的所有解决方案都可以在某些特定情况下工作,但我认为最后一个解决方案 (3.2) 适用于更一般的用例。我在我的开源框架中使用它,效果很好。

因此,事件存储负责确保聚合不会被两个命令同时改变。

一种方法是使用乐观锁定。从事件存储加载聚合时,您会记住它的version。当您持久化这些事件时,您会尝试使用version + 1 附加它们。每个AggregateType-AggregateId-version 必须有一个唯一索引。如果追加失败,您应该重试整个过程(加载+句柄+追加)。

我认为这是最具可扩展性的解决方案,因为当分片键是 AggregateId 的子集时,它甚至适用于分片。

您可以轻松地将 MongoDB 用作 EventStore。在 MongoDB 原子地附加所有事件。

另一种解决方案是使用悲观锁定。您在加载聚合、附加事件、增加其版本和提交之前启动事务。您需要使用 2 个表/集合,一个用于聚合元数据+版本,一个用于实际事件。 MongoDB >= 4.0 有事务。

在这两种解决方案中,事件存储都不会损坏

我不熟悉的另一点是卡夫卡。不知道 Kafka 有没有针对这个问题提供的解决方案。

您可以将 Kafka 与事件溯源结合使用,但您需要更改架构。见this答案。

【讨论】:

不错的方法。为“AggregateType-AggregateId-version”使用一个索引可能会解决问题。因为如果两个进程在版本中加+1,第二个就无法保存了。但是,如果其中一个进程给出的命令生成两个事件而另一个只生成一个事件,我们将回到同样的问题。例如,一个命令是提取 100 美元,另一个是从帐户向同一帐户转移 100 美元(我知道这不是一个真实的用例)。第一个在分类帐中添加一个条目,第二个添加两个(债务和收入)。 @RodrigoRiskallaLeal 不完全是,因为您只有一个文档用于所有事件(每次提交一个文档)。 @RodrigoRiskallaLeal 看这里:github.com/xprt64/mongolina/blob/master/src/Mongolina/… @RodrigoRiskallaLeal 这是活动商店:github.com/xprt64/mongolina/blob/master/src/Mongolina/… 是的,如果我们使用像 MongoDb 这样的基于文档的解决方案来存储事件,我们可以拥有一个包含所有事件的文档。但这只是隐藏了问题,因为现在,如果两个并发进程尝试写入,第一个进程将保存,然后第二个进程将覆盖整个文档并使用它自己的事件保存它。比如一个提现$10,另一个提现$200,如果$200在第一个之后进入commit,那么MoneyWithdrawn $100的事件就会丢失。【参考方案2】:

简短回答:原子事务仍然是一回事。

更长的答案:要正确处理并发写入,您需要,或者需要条件写入(也称为比较和交换)。

使用日志:我们需要在步骤 6 之前获取锁,并在步骤 12 之后释放锁。

使用条件写入:在第 6 步,存储库将捕获一个并发谓词(可能是隐式的——例如,读取的事件计数)。在第 12 步执行写入时,将检查并发谓词以确保没有并发修改。

例如,HTTP API for Event Store 使用 ES-ExpectedVersion;客户端负责计算(从它获取的事件中)它期望写入发生的位置。

Gabriel Schenker 在他 2015 年的文章 Event Sourcing applied -- the Repository 中描述了 RDBMS 存储库和事件存储库。

当然,随着条件写入的引入,您应该考虑在写入失败时您希望模型做什么。您可能会引入重试策略(转到第 6 步),或者尝试合并策略,或者干脆失败并返回给发件人。

在您的条件写入示例中,我假设有必要在第 11 步中添加一个锁(以便它锁定事件存储以获取并发谓词)。并且只有在将新事件写入事件存储后才释放锁。否则,两个并发进程可以通过并发谓词检查并保存事件。

不一定。

如果您的持久性存储提供锁,但不提供条件写入,那么您的想法是正确的:在第 12 步中,存储库将获取锁,检查前提条件,提交新事件,然后释放锁。

但理解条件写入的持久性设备可以为您实现该检查。使用事件存储,存储库不需要获取锁。它将带有关于预期状态的元数据的事件发送到商店。事件存储本身使用该信息来执行条件写入。

没有魔法 - 有人 需要完成工作以确保并发写入不会相互干扰。但它不一定必须在您的代码中。

请注意,我使用的是 Eric Evans 在蓝皮书中所描述的“存储库”——这是hides your choice 对如何存储来自系统其余部分的事件的抽象;换句话说,是适配器让您的事件存储看起来像内存中的事件集合——而不是事件存储本身。

【讨论】:

在您的条件写入示例中,我假设有必要在步骤 11 中添加一个 Lock(以便它锁定事件存储以获取并发谓词)。并且只有在将新事件写入事件存储后才释放锁。否则,两个并发进程可以通过并发谓词检查并保存事件。 我知道,如果我们希望进程正常运行,则需要从第 6 步到第 12 步进行锁定。因为,这样第二个过程将尝试提取资金并且可能没有足够的信用。但是,如果我们只是希望我们的事件存储是正确的(没有损坏),那么仅从第 11 步到第 12 步锁定就足够了。对吗?

以上是关于与 DDD + CQRS + ES 的并发的主要内容,如果未能解决你的问题,请参考以下文章

DDD领域驱动设计:CQRS架构模式

DDD 中的那些模式 — CQRS

分享一个CQRS/ES架构中基于写文件的EventStore的设计思路

项目第一次实现 & CQRS初探

DDD-CQRS的落地案例

CQRS简单入门(Golang)