如何实现 CQRS 以及在哪里创建读取数据库

Posted

技术标签:

【中文标题】如何实现 CQRS 以及在哪里创建读取数据库【英文标题】:How is CQRS Implemented and where is Read DB getting created 【发布时间】:2021-10-30 22:22:34 【问题描述】:

我有发现服务:https://github.com/Naresh-Chaurasia/API-MicroServices-Kafka/tree/master/Microservices-CQRS-SAGA-Kafka/DiscoveryService

我有产品服务:https://github.com/Naresh-Chaurasia/API-MicroServices-Kafka/tree/master/Microservices-CQRS-SAGA-Kafka/ProductsService

我有 API 网关:https://github.com/Naresh-Chaurasia/API-MicroServices-Kafka/tree/master/Microservices-CQRS-SAGA-Kafka/ApiGateway

产品服务和 API 网关注册到发现服务。我使用 API Gateway 访问产品服务。

我正在学习为产品服务实施 CQRS 的课程。

在ProductService下,我有src/main/java/com/appsdeveloperblog/estore/ProductsService/command/ProductAggregate.java

这里的 ProductAggregate 是 CRQS 的 Command

它有以下几种方法(详情请参考GitHub):

@CommandHandler
public ProductAggregate(CreateProductCommand createProductCommand) throws Exception 
...


@EventSourcingHandler
public void on(ProductCreatedEvent productCreatedEvent) 
...


它还有src/main/java/com/appsdeveloperblog/estore/ProductsService/query/ProductEventsHandler.java,将产品持久化在H2 db中。

我也实现了src/main/java/com/appsdeveloperblog/estore/ProductsService/query/ProductsQueryHandler.java,用于查询db。

这里 ProductsQueryHandler 是 CRQS 的 Query

我的问题如下

    我无法理解发布事件的生成方式和时间,以及消息何时放入 消息传递 队列。 另外,有没有可能数据被持久化到Event Store后,没有保存在Read DB中。如果是,那么我们如何同步 Read DB。

【问题讨论】:

【参考方案1】:

我无法理解发布事件的生成方式和时间,以及消息何时放入消息队列。

它发生在 事件发布到事件存储之后。

您可以使用许多可能的设计将事件从事件存储复制到查询端的事件处理程序。这些将包括

让应用程序代码将事件复制到事件处理程序订阅的消息队列中 让事件处理程序按计划从事件存储中提取一批事件 让事件处理程序从事件存储中拉取事件,但使用消息队列宣布有新消息要拉取。

有没有可能是数据持久化到Event Store后,没有存入Read DB。

是的。这有多普遍取决于...嗯,实际上这主要取决于您在可靠性方面的投入。

这就是为什么拉模型趋于流行的原因——读取过程可以跟踪它所看到的事件,并在 X 之后请求下一批消息——其中 X 是时间戳或序列号,什么的。

警告:如果您尝试创建自己的事件存储,则正确获取这些详细信息可能会很棘手。除非活动商店的细节是您竞争优势的一部分,否则您真的想购买可靠性而不是尝试构建它。

【讨论】:

以上是关于如何实现 CQRS 以及在哪里创建读取数据库的主要内容,如果未能解决你的问题,请参考以下文章

程序员除了会CRUD之外,还应该知道什么叫CQRS!

程序员除了会 CRUD 之外,还应该知道什么叫 CQRS!

教程丨程序员除了会CRUD之外,还应该知道什么叫CQRS

CQRS 命令和域状态

如何制作动态表

如何在事件源应用程序中管理读取请求