分享一个CQRS/ES架构中基于写文件的EventStore的设计思路
Posted ENode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分享一个CQRS/ES架构中基于写文件的EventStore的设计思路相关的知识,希望对你有一定的参考价值。
最近打算用C#实现一个基于文件的EventStore。
什么是EventStore
关于什么是EventStore,如果还不清楚的朋友可以去了解下CQRS/Event Sourcing这种架构,我博客中也有大量介绍。EventStore是在Event Sourcing(下面简称ES)模式中,用于存储事件用的。从DDD的角度来说,每个聚合根在自己的状态发生变化时都会产生一个或多个领域事件,我们需要把这些事件持久化起来。然后当我们需要恢复聚合根的最新状态到内存时,可以通过ES这种技术,从EventStore获取该聚合根的所有事件,然后重演这些事件,就能将该聚合根恢复到最新状态了。这种技术和mysql的Redo日志以及Redis的AOF日志的原理是类似的。但是区别是,redo/AOF日志是Command Sourcing,而我们这里说的是Event Sourcing。关于这两个概念的区别,我不多展开了,有兴趣的朋友可以去了解下。
为什么要自己写一个EventStore
目前ENode使用的EventStore,是基于关系型数据库SqlServer的。虽然功能上完全满足要求,但是性能上和数据容量上,离我的预期还有一些距离。比如:
关于性能,虽然可以通过SqlBulkCopy方法,实现较大的写入吞吐,但是我对EventStore的要求是,需要支持两个唯一索引:1)聚合根ID+事件版本号唯一;2)聚合根ID+命令ID唯一;当添加这两个唯一索引后,会很大影响SqlBulkCopy写入数据的性能;而且SqlBulkCopy只有SqlServer才有,其他数据库如MySQL没有,这样也无形之中限制了ENode的使用场景;
关于使用场景,DB是基于SQL的,他不是简单的帮我们保存数据,每次写入数据都要解析SQL,执行SQL,写入RedoLOG,等;另外,DB还要支持修改数据、通过SQL查询数据等场景。所以,这就要求DB内部在设计存储结构时,要兼顾各种场景。而我们现在要实现的EventStore,针对的场景比较简单:1)追求高吞吐的写入,没有修改和删除;2)查询非常少,不需要支持复杂的关系型查询,只需要能支持查询某个聚合根的所有事件即可;所以,针对这种特定的使用场景,如果有针对性的实现一个EventStore,我相信性能上可以有更大的提升空间;
关于数据量,一个EventStore可能需要存储大量的事件,百亿或千亿级别。如果采用DB,那我们只能进行分库分表,因为单表能存储的记录数是有限的,比如1000W,超过这个数量,对写入性能也会有一定的影响。假设我们现在要存储100亿事件记录,单表存储1000W,那就需要1000个表,如果单个物理库中分100个表,那就需要10个物理库;如果将来数据量再增加,则需要进一步扩容,那就需要牵涉到数据库的数据迁移(全量同步、增量同步)这种麻烦的事情。而如果是基于文件版本的EventStore,由于没有表的概念了,所以单机只要硬盘够大,就能存储非常多的数据。并且,最重要的,性能不会因为数据量的增加而下降。当然,EventStore也同样需要支持扩容,但是由于EventStore中的数据只会Append写入,不会修改,也不会删除,所以扩容方案相对于DB来说,要容易做很多。
那为何不使用NoSQL?NoSQL一般都是为大数据、可伸缩、高性能而设计的。因为通常NoSQL不支持上面第一点中所说的二级索引,当然一些文档型数据库如MongoDB是支持的,但是对我来说是一个黑盒,我无法驾驭,也没有使用经验,所以没有考虑。
从长远来看,如果能够自己根据自己的场景实现一个有针对性的EventStore,那未来如果出现性能瓶颈的问题,自己就有足够的能力去解决。另外,对自己的技术能力的提高也是一个很大的锻炼机会。而且这个做好了,说不定又是自己的一个很好的作品,呵呵。所以,为何不尝试一下呢?
EventStore的设计目标
要求高性能顺序写入事件;
要求严格判断聚合根的事件是否按版本号顺序递增写入;
支持命令ID的唯一性判断;
支持大量事件的存储;
支持按照聚合根ID查询该聚合根的所有事件;
支持动态扩容;
高可用(HA),需要支持集群和主备,二期再做;
EventStore核心问题设计方案
写入每一个事件时需要保证两个业务规则
首先我们先看一下每次写入一个事件时,客户端会传给我们什么信息:
聚合根ID
事件版本号
命令ID
事件内容
事件发生时间
针对上面的设计目标,写入一个事件到EventStore时,我们需要保证两个业务规则:1)当前事件的版本号必须是聚合根的当前版本号(前一个写入的事件的版本号)+1;2)命令ID唯一;
为什么要保证这两个业务规则呢?
第一个是为了能支持检测聚合根在并发更新时产生的并发冲突,当同一个聚合根在两个线程中同时被更新(虽然ENode基本保证了不会出现这种情况,但设计上没有做到绝对的避免),则这两个更新所产生的事件的版本号是一样的,而这种情况是不允许的,同一个聚合根的修改必须线性修改。所以EventStore需要能检测出来这种情况,并告诉客户端;
第二个是为了能够自动检测出同一个CQRS的命令是否重复执行了,也就是为了命令的幂等处理。因为现代的大部分分布式消息队列如kafka, rocketmq, rabbitmq, equeue都无法保证消息不会重复投递,所以,任何一个命令都有可能被重复执行,当一个命令被一先一后被执行两次,然后产生两个事件,虽然此时这两个事件的版本号都是没问题的,但是因为重复执行了命令,所以很可能会导致最后的结果不正确。所以需要在底层的数据存储层面检测出这种情况,并返回给客户端。通常如果使用DB,针对上面这两个业务规则,我们可以建立两个唯一索引即可。
当然,也许你会说,我们可以把这两个业务规则交给上层应用保证啊,不一定必须在EventStore中做掉。确实,上层应用也可以做,但上层应用因为是无状态的,而上面这两个业务规则的检查都需要依赖于状态;另外一个原因,上层应用都是集群部署的,所以,如果要由上层自己保证,那必须要用到类似于分布式锁的东西,整个架构的性能立马下降一个档次。
那如何保证这两个业务规则呢?
第一个业务规则的思考:
很容易想到,我们必须保存当前聚合根的最新版本,这样在下一个事件过来时,才能判断出下一个事件的版本是否是当前版本+1。针对这个问题,基于C#语言,最容易想到的就是,我们可以在本地托管内存中维护一个ConcurrentDictionary<string, uint>这样的字典。其中key为聚合根ID,value为聚合根的当前版本号。这样当一个事件过来时,就能实现上述的判断了。但是,假设单台EventStore上有1亿个聚合根,那就意味着这个字典中就有1亿个key,这样这个字典就会占用不少的内存,初步估算了一下,至少有4GB吧。在这么大的内存占用下,GC很可能会有问题。
那怎么办呢?另一个方案是使用非托管内存来存储这个字典。但是非托管内存中如何实现一个这样的字典我没太多经验,不是C++出生,呵呵。会的同学可以帮我想想怎么实现,这个对于C++开发来说,应该是比较简单的需求吧。
关于解决GC的问题,我觉得还有一个办法也许可行,但我还没做充分测试,大伙有经验的也可以帮我看看。思路是:
设计一个环形数组,数组的大小在EventStore启动时进行初始化,比如为1KW。然后数组中每个元素为一个对象(假设叫VersionEntry),该对象中有两个字段:聚合根ID、聚合根当前版本号;然后,当一个聚合根的事件过来时,我们根据聚合根的ID的hashcode取摸环形数组的大小,就能知道该聚合根在数组中的下标了,然后根据下标把VersionEntry拿到,然后判断VersionEntry中的聚合根ID是否和当前聚合根的ID相同,如果相同,说明当前聚合根的最新版本号在这个环形数组中找到了;如果不相同,则认为没找到。然后,如果找到的情况下,就更新最新版本号为下一个版本号;如果没找到,则需要从磁盘尝试加载该聚合根的最新版本号,这个问题下面会讲到如何实现。
通过这个设计,我们将一定数量的聚合根的最新版本号缓存在一个巨大的数组中,然后EventStore启动时,就预先初始化好整个数组中的所有对象,当然,此时这些对象的聚合根ID和版本号都是空的。通过之前学习NFX的源码,我相信通过这样的数组,可以极大程度的降低Full GC的耗时代价,因为g2没有任何内存碎片,不需要压缩移动。另外,关于这个环形数组,还有一个优化点,就是hashcode可以支持二级。就是当一级hashcode对应的VersionEntry已经存在且聚合根ID和当前聚合根ID不相同时,自动将该VersionEntry的位置替换为一个新的VersionEntry数组,数组大小不需要太大,比如为7。然后把新老聚合根的信息放入这个新的VersionEntry中,当然,即便是二级hash,还是有可能出现哈希碰撞冲突,此时就覆盖老数据即可。另外,还有一点比较重要,环形数组的大小应该是质数。
还有最后一点需要再强调一下:
不是说当前的EventStore机器上存储了1亿个聚合根的事件,我们的字典或者环形数据就必须要保存1亿个key。我们应该根据实际服务器的内存大小以及GC的影响,来综合判断应该缓存多少聚合根。当然,我作为框架设计者,在设计这个缓存方案时,会尽力确保在缓存非常多的key的时候,也没有什么大的副作用,比如GC。也就是说,尽量在软件层面做到无瓶颈,尽量能支持到只要物理内存大小足够,就能支持配置多少大的缓存要求。
第二个业务规则的思考:
第二个业务规则,是一个典型的kv的需求场景,而且我们只需要使用嵌入式的kv即可。两个选择:1)自己实现一个;2)使用开源的成熟的高性能嵌入式的kv,如leveldb,stsdb;经过考虑后,还是选择使用方案二。主要是我觉得既然有成熟的东西可以使用,就应该使用,而不是自己造轮子。目前暂定使用leveldb,当然具体使用哪个还需要进一步调研。
key的设计:命令ID作为key即可。处理逻辑:一个事件过来时,判断命令ID是否重复,如果重复,就直接返回告诉客户端命令重复了;否则继续往下处理。
如何高性能写入事件以及事件索引?
如何存储数据?
我们需要存储的数据有三种,如下:
事件本身数据,写入到数据文件。单个数据文件的大小固定,比如每个文件1GB。写入方式为二进制数据顺序写文件,一个文件写满后,新建下一个文件,继续顺序写到新文件;写数据文件时不需要做任何业务规则检查,只管写二进制数据即可。这个设计和EQueue中存放消息的文件一样,本文就不多做介绍了。有兴趣的朋友可以看看这篇文章:http://www.cnblogs.com/netfocus/p/4927495.html
事件索引数据,记录每个聚合根的每个版本对应的事件在数据文件中的物理位置;有了这个索引数据,我们就能实现根据某个聚合根ID获取该聚合根的所有版本的事件的需求了;先查索引数据获取该聚合根的所有版本的事件在数据文件中的物理位置,再根据这些位置最终拿到事件信息。那事件索引数据如何存放呢?也是通过leveldb即可,key为aggId_version,即聚合根ID+聚合根版本号。value为该版本的事件在数据文件中的物理位置;
命令ID数据,我们需要记录所有的事件的命令ID,这样才能当一个事件过来时,判断该事件对应的命令是否已经处理过。这个同样使用leveldb即可,命令ID作为key。
当一个事件过来时的处理逻辑:
先判断命令是否被处理过:到leveldb查找key是否存在,判断命令是否已被处理过;如果已被处理过,则直接返回该事件已被处理过的结果给客户端;
判断事件版本号是否合法:如果命令未被处理过,则判断当前事件的版本号是否是当前聚合根的当前版本号的下一个版本号;上面介绍第一个业务规则时,我们了解到,聚合根的当前版本号很可能在缓存(环形数组)里,如果在,则直接可以拿出来判断;如果不在,则需要从leveldb加载当前版本号。那加载哪些版本号呢?举个例子来说明吧:假设当前事件的版本号为10,则从事件索引leveldb中尝试获取版本号为9的事件以及10的事件。如果存在10的事件,则说明遇到并发冲突了,直接返回客户端结果告诉客户端并发冲突;如果10不存在,则继续判断9是否存在,如果存在,则符合预期,也就是第一条业务规则满足条件。如果9也不存在,则认为当前事件的版本号非法,也返回客户端相应结果即可;因为如果当前聚合根的当前版本号为8,那是不可能过来一个版本号为10的事件的,过来的一定是9,因为聚合根的版本号总是按一依次递增的。
如果命令和事件版本都合法,就开始写入数据:1)写入事件到事件数据文件,2)写入事件索引到leveldb,3)写入命令ID数据到leveldb;
三种数据都写完成功后,更新缓存中当前聚合根的当前版本号为当前事件的版本号;
性能分析:
当一个事件过来时,我们一般是需要访问三次IO,1)顺序写事件到事件数据文件;2)写入事件索引到leveldb;3)写入命令ID到leveldb;大家觉得这3个写入操作,最终可以提供多少的写入TPS?我的目标是单机能支持50000TPS。大家觉得这个设计能否做到呢?还是等待最终开发完成后进行测试吧。
如何支持查询?
如何解决多线程并发写的时候的CPU占用高的问题?
到这里,我们分析了如何存储数据,如何写入数据,还有如何查询聚合根的所有事件,应该说基本功能已经实现了。另外,如果是单线程访问EventStore,我相信性能不会很低了。但是如果是N多客户端同时并发写事件呢?这个时候就会导致EventStore服务器会有很多线程要求同时写入事件到数据文件,但是大家知道写文件必须是单线程的,如果是多线程,那也要用锁的机制,保证同一个时刻只能有一个线程在写文件。最简单的办法就是写文件时用一个lock搞定。但是经过测试发现简单的使用lock,在多线程的情况下,会导致CPU很高。因为每个线程在处理当前事件时,可能需要涉及到多次IO,所以锁的占用时间比较长,导致很多线程都在阻塞等待。
为了解决这个问题,做了一些调研,最后决定使用双缓冲队列的方式来解决。大致思路是:
设计两个队列,将要写入的事件先放入队列1,然后当前要真正处理的事件放在队列2。这样就做到了把接收数据和处理数据这两个过程在物理上分离,先快速接收数据并放在队列1,然后处理时把队列1里的数据放入队列2,然后队列2里的数据单线程线性处理。这里的一个关键问题是,如何把队列1里的数据传给队列2呢?是一个个拷贝吗?不是。这种做法太低效。更好的办法是用交换两个队列的引用的方式。具体思路这里我不展开了,大家可以网上找一下相关概念。这个设计我觉得最大的好处是,可以有效的降低多线程写入数据时对锁的占用时间,本来一次锁占用后要直接处理事件的,而现在只需要把事件放入队列即可。双缓冲队列可以在很多场景下被使用,我认为,只要是多个消息生产者并发产生消息,然后单个消费者单线程消费消息的场景,都可以使用。而且这个设计还有一个好处,就是我们可以单线程批量处理队列2里的数据。
如何扩容?
我们再来看一下最后一个我认为比较重要的问题,就是如何扩容。
虽然我们单台EventStore机器只要硬盘够大,就可以存储相当多的事件。但是硬盘再大也有上限,所以扩容的需求总是有的。所以如何扩容呢?上面我提到,持久化的数据有三种,经过分析后发现,其实要扩容的数据只有第一种,即事件数据本身。因为事件里包含了所有的信息,聚合根ID,命令ID,事件版本号等。可以说,有了事件数据,我们就能得到另外两种数据了。归根结底,另外两种数据只是事件的两种二级索引。事件索引数据是根据聚合根ID对事件建立索引;命令ID数据是根据命令ID对事件建立索引。所以,基于这个前提,扩容就很简单了,我们只需要将事件数据进行扩容即可。
那如何扩容呢?假设现在有4台EventStore机器,要扩容到8台。
有两个办法:
土豪的做法:准备8台全新的机器,然后把原来4台机器的全部数据分散到新准备的8台机器上,然后再把老机器上的数据全部删除;
屌丝的做法:准备4台全新的机器,然后把原来4台机器的一半数据分散到新准备的4台机器上,然后再把老机器上的那一半数据删除;
对比之下,可以很容易发现土豪的做法比较简单,因为只需要考虑如何迁移数据到新机器即可,不需要考虑迁移后把已经迁移过去的数据还要删除。而EventStore的数据是不允许删除的,只允许追加写。所以,我放弃了第二种做法。所以,接下来只需要考虑如何实现第一种做法。大体的思路是:
采用拉的方式,新的8台目标机器都在向老的4台源机器拖事件数据;目标机器记录当前拖到哪里了,以便如果遇到意外中断停止后,下次重启能继续从该位置继续拖;
每台源机器都扫描所有的事件数据文件,一个个事件进行扫描,扫描的起始位置由当前要拖数据的目标机器给出;
每台目标机器该拖哪些事件数据?一种可行的方法是:预先在源机器上配置好这次扩容的目标机器的所有唯一标识,如IP;然后当某一台目标机器过来拖数据时,告知自己的机器的IP。然后源机器根据IP就能知道该目标机器在所有目标机器中排第几,然后源机器就能知道应该把哪些事件数据同步给该目标机器了。举个例子:假设当前目标机器的IP在所有IP中排名第3,则针对每个事件,获取事件的聚合根ID,然后将聚合根ID hash 取摸8,如果余数为3,则认为该事件需要同步给该目标机器,否则就跳过该事件;通过这样的思路,我们可以保证同一个聚合根的所有事件都最终同步到了同一台新的目标机器。只要我们的聚合根ID够均匀,那最终一定是均匀的把所有聚合根的事件均匀的同步到目标机器上。
当目标机器上同步过来一条事件数据时,同时更新leveldb中的事件索引数据和命令ID数据;
扩容过程的数据同步迁移的思路差不多了。但是扩容过程不仅仅只有数据迁移,还有客户端路由切换等。那如客户端何动态切换路由信息呢?或者说如何做到不停机动态扩容呢?呵呵。这个其实是一个外围的技术。只要数据迁移的速度跟得上数据写入的速度,然后再配合动态推送新的路由配置信息到所有的客户端。最终就能实现动态库容了。这个问题我这里先不深入了,搞过数据库动态扩容的朋友应该都了解原理。无非就是一个全量数据迁移、增量数据迁移、数据校验、短暂停止写服务,切换路由配置信息这几个关键的步骤。我上面介绍的是最核心的数据迁移的思路。
结束语
本文介绍了我之前一直想做的一个基于文件版本的EventStore的关键设计思路,希望通过这篇文章把自己的思路系统整理出来。一方面通过写文章可以进一步确信自己的思路是否OK,因为如果你文章写不出来,其实思路一定是哪里有问题,写文章的过程就是大脑整理思绪的过程。所以,写文章也是检查自己设计的一种好方法。另一方面,也可以通过自己的原创分享,希望和大家交流,希望大家能给我一些意见或建议。这样也许可以在我动手写代码前能及时纠正一些设计上的错误。最后再补充一点,语言不重要,重要的是设计思路。谁说C#语言做不出好东西呢?呵呵。
以上是关于分享一个CQRS/ES架构中基于写文件的EventStore的设计思路的主要内容,如果未能解决你的问题,请参考以下文章