Axon Framework 扩展 - Spring AMQP

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Axon Framework 扩展 - Spring AMQP相关的知识,希望对你有一定的参考价值。

参考技术A 除了默认的 Axon Server 之外,Spring AMQP 是分发事件的另一种方法。

Axon 提供开箱即用的支持,可以将事件传入和传出 AMQP 消息代理,例如 RabbitMQ。

要使用 Axon 中的 Spring AMQP 组件,请确保 axon-amqp 模块在类路径中可用。

要将应用程序中生成的事件转发到 AMQP 通道,一行 application.properties 配置就足够了:

这将自动将所有已发布的事件发送到具有给定名称的 AMQP Exchange。

默认情况下,发送时不使用 AMQP 事务。 这可以使用 axon.amqp.transaction-mode 属性覆盖,并将其设置为 transactional 或 publisher-ack 。

Spring 广泛支持从 AMQP 队列中读取消息。 但是,这需要“桥接”到 Axon,以便可以从 Axon 处理这些消息,就好像它们是常规事件消息一样。

SpringAMQPMessageSource 允许事件处理器从队列而不是事件存储或事件总线中读取消息。 它充当 Spring AMQP 和这些处理器所需的 SubscribableMessageSource 之间的适配器。

要从队列接收事件并在 Axon 应用程序中处理它们,您需要配置 SpringAMQPMessageSource :

然后配置一个处理器来使用这个 bean 作为它的消息源:

请注意,tracking processor 与 SpringAMQPMessageSource 不兼容。

Axon 或 Kafka 支持 CQRS/ES

【中文标题】Axon 或 Kafka 支持 CQRS/ES【英文标题】:Axon or Kafka to support CQRS/ES 【发布时间】:2021-02-23 09:01:57 【问题描述】:

考虑一个简单的用例,我想将产品评分作为事件存储在事件存储中。

我可以使用两种不同的方法:

    使用 Axon:Rating 聚合负责处理 CreateRatingCommand 并发送 RatingCreatedEvent。发送事件将使评级存储在事件存储中。其他事件处理程序有可能在连接到 Axon 服务器实例时重播事件流并对评级进行任何需要的操作。在这种情况下,事件处理程序将用作流处理器。 使用 Kafka:KafkaProducer 将用于在 Kafka 主题中存储 Rating POJO(经过适当的序列化)。将主题的保留时间设置为无限期不会导致任何事件及时丢失。在这种情况下,Kafka Streams 将用于执行实际的评级处理逻辑。

我认为这两种方法都有一些架构问题:

使用轴突时:

    如果聚合体中没有需要维护或更改的真实状态,使用 Axon(或类似解决方案)是否有任何附加价值?聚合只是充当数据的“哑”占位符,但不提供任何状态更改逻辑。 Axon 如何处理同一事件类型的多个事件处理程序?它们会全部并行处理相同的事件(相同的聚合 id),还是同一事件仅由其中一个处理程序处理一次? 存储在 Axon 事件存储中的事件是否会保留到时间结束?

使用 Kafka 时:

    Kafka 将具有相同键的事件/消息存储在同一分区中。在用户产品评级的用例中,如何为键选择最佳值? UserId、ProductId 或两者的单独主题,并在两个主题中发布每个事件。 为每个用户和每个产品使用单独的主题是否会导致集群上出现大量主题? (大约 10k 用户)。

我不知道 SO 是否是此类问题的首选论坛...我只是想知道您(会)在这个特定用例中推荐什么作为最佳实践。期待您的反馈,并随时指出我在之前的问题中遗漏的其他想法。

EDIT@12/11/2020:我刚刚找到了一个related discussion,其中包含与我的问题相关的有用信息。

【问题讨论】:

SO 确实可能不是讨论这个问题的最佳平台,它很容易被“基于意见”或“过于广泛”而关闭……这是 axon 论坛discuss.axoniq.io。我会在那里尝试,他们不只是在那里销售他们的产品。 我同意!感谢您将我指向适当的论坛。我应该删除这篇文章吗? 无需删除。尤其是现在它有一个包含有价值信息的答案。很有可能它会因“基于意见”而关闭,所以我建议在discuss.axoniq.io/t/axon-or-kafka-to-support-cqrs-es/2889 继续任何进一步的讨论@似乎到目前为止参与的每个人都在那里。 【参考方案1】:

正如 Jan Galinski 已经说过的那样,这确实没有一个万无一失的答案。这值得更广泛的讨论,例如确实AxonIQ's Discuss forum。无论如何,这里有一些问题我绝对可以给出答案,所以让我们开始吧:

    Axon 问题 1 - 正如您所注意到的,Axon 框架经常用于以 DDD 为中心的应用程序。然而,没有什么能强迫你把自己建立在这个概念上。您可以将框架从事件溯源细节以及建模细节中完全剥离出来,纯粹为了不同命令、事件和查询的消息传递理念。当版本 4(当前)实际发布时,将 Axon Framework 版本 3 分成这些子部分是一个有意识的决定。除此之外,我认为不仅仅基于事件消息是很有价值的。使用不同的命令和查询只会进一步解耦您的组件,从而使应用程序环境更加丰富和容易扩展。 Axon Question 2 - 这取决于@EventHandler 注释方法的实际位置。如果它们在同一个类中,则只会调用一个。如果它们被定位到不同的类中,那么两者都将收到相同的事件。此外,如果它们在不同的类之间隔离,请务必注意 Axon 使用事件处理器作为调用事件处理程序的技术解决方案。如果不同的类被分组在同一个事件处理器下,你可以强加一个特定的顺序,首先调用哪个处理程序。除此之外,如果事件处理应该并行发生,您将必须配置一个所谓的TrackingEventProcessor(Axon Framework 中的默认值),因为它允许配置多个线程来同时处理事件。好吧,总结本节,您在问题二中提出的所有问题都是一种选择,也不是必需品。真的只是配置问题。可能值得在 Axon Framework 的 this 文档页面上查看此事。 Axon 问题 3 - 由于 Axon Server 服务于 Event Store,因此根本没有保留期。所以是的,它们默认保存到时间结束。但是,如果您觉得存储事件没有任何价值,例如作为所有模型的基础(就像您在使用事件源时所做的那样),那么没有什么可以阻止您删除事件。

这是我个人不太熟悉的 Kafka 问题(我猜是 Axon Framework 的贡献者)。不过,我也可以在这件事上给你两分钱,尽管我会在这里推荐第二种意见:

    Kafka 问题 1 - 根据我个人对此类应用程序所需的感觉,我假设您希望能够尽可能高效地检索给定产品的所有数据。我敢打赌,重要的是所有事件都在同一个分区中,以使这个过程尽可能高效,之后不需要任何合并。考虑到这一点,我认为使用 ProductId 最有意义。 Kafka 问题 2 - 如果您预计只有 5_000 个产品和 10_000 个用户,我想为这些设置单独的主题应该是可行的。 意见传入 - 虽然我个人认为 Kafka 的意图是让您直接决定何时使用主题,而不是您实际尝试实现的复杂功能,即哪些业务功能。从应用程序开发的角度来看,赋予分离流的能力更像是事后的想法。只要您需要企业级/高效的消息总线,我认为这就是该选项真正大放异彩的时候,因为那时您可以针对批量进行优化。

希望这一切能帮助您进一步@KDW!

【讨论】:

以上是关于Axon Framework 扩展 - Spring AMQP的主要内容,如果未能解决你的问题,请参考以下文章

Axon Framework - saga 基础设施

Axon 服务器命令不包含路由键

Axon 或 Kafka 支持 CQRS/ES

带有 Axon 的 Spring Boot JPA

Axon

DDD随笔-Axon