基于事件驱动架构构建微服务第13部分:使用来自Apache KAFKA的事件并将投影流传输到ElasticSearch...

Posted dotNET跨平台

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于事件驱动架构构建微服务第13部分:使用来自Apache KAFKA的事件并将投影流传输到ElasticSearch...相关的知识,希望对你有一定的参考价值。

原文链接:https://logcorner.com/building-microservices-through-event-driven-architecture-part13-read-model-projection-project-streams-into-elasticsearch/

在本教程中,我将展示如何从KAFKA读取流并将流投影到ElasticSearch中。

我必须使用来自KAFKA的消息,我从KAFKA读取的消息是事件流。所以我必须将这些流投影到结构化表示中。然后我会将投影索引到ElasticSearch中。

所以我会建立一个订阅KAFKA并监听事件的消费者。如果它接收到一个event,它将使用投影来创建该事件的结构演示。最后将其存储到一个nosql数据库ElasticSearch。

投影事件

实际上,读取数千个事件会花费太长时间,相反我们可以预先计算当前状态并将其存储到nosql数据库中。投影可以定义为从一系列事件中导出的当前状态

我定义了一个基本的泛型类Entity,因此每个投影都将从它派生。

我定义了一个抽象的泛型类Projection,它接受一个事件列表并将它们应用于具体类(在我们的例子中是 SpeechProjection)。

SpeechProjection是一个表示我想从事件(SpeechCreatedEvent、SpeechTitleChangedEvent、SpeechDescriptionChangedEvent、SpeechUrlChangedEvent和SpeechTypeChangedEvent)重建其状态的实体的类。

因此,对于与给定实体(语音)相关的每个事件,我必须将事件应用于实体。

ElasticSearch介绍

Elasticsearch是一种分布式RESTful搜索和分析引擎,能够处理越来越多的用例。作为Elastic Stack的核心,它集中存储你的数据,以实现闪电般的快速搜索、微调相关性和可轻松扩展的强大分析。https://www.elastic.co/elasticsearch/

转到以下链接安装elasticsearch:https://www.elastic.co/downloads/elasticsearch

你可以通过使用PowerShell运行以下命令 curl http://localhost:9200/ 或 Invoke-RestMethod http://localhost:9200 来验证安装是否正常

以下代码创建一个通用存储库以连接到弹性搜索,并执行CRUD操作。

创建工作服务

ASP.NET Core Worker Service模板为编写长时间运行的服务应用程序提供了一个起点。

我们可以使用工作服务来构建不需要用户交互或执行定期和长时间运行的工作负载的应用程序。

https://docs.microsoft.com/fr-fr/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-5.0&tabs=visual-studio

我将使用Worker Service构建一个消费服务,该服务消费来自APACHE KAFKA的事件并将它们索引到ElasticSearch

ConsumerHostedService

ConsumerHostedService是承载ConsumerService的后台服务

ConsumerService

ConsumerService调用服务总线,该总线在产生新事件时从Kafka接收通知。

服务总线

KafkaClient

KafkaClient实现了IServiceBusProvider的ReceiveAsync。它订阅了一个Kafka主题,因此当一个事件发布到该主题时,它会通知一个中介服务。

ElasticSearchNotifier实现了INotificationHandler。这个类的职责是反序列化输入事件并将其索引到elasticsearch。

测试

启动zookeeper

zookeeper-server-start.bat config\\zookeeper.properties

启动Kafka

kafka-server-start.bat config\\server.properties

启动ElasticSearch

启动下列工程:

  • LogCorner.EduSync.SignalR.Server

  • LogCorner.EduSync.Speech.Producer

  • LogCorner.EduSync.Speech.Consumer 启动下列工程:

  • LogCorner.EduSync.Speech.Presentation

启动Postman并且post一个新command 

你应该在消费者控制台上看到以下输出,使用postman上发布的命令

代码源可在此处获得:

https://github.com/logcorner/LogCorner.EduSync.Speech.Command https://github.com/logcorner/LogCorner.EduSync.Speech.ServiceBus/tree/Feature/Task/IndexMessagesToElasticSearch

以上是关于基于事件驱动架构构建微服务第13部分:使用来自Apache KAFKA的事件并将投影流传输到ElasticSearch...的主要内容,如果未能解决你的问题,请参考以下文章

基于事件驱动架构构建微服务第14部分:查询API

基于事件驱动架构构建微服务第15部分:SPA前端

基于事件驱动架构构建微服务第7部分:在仓储上实现事件溯源

基于事件驱动架构构建微服务第4部分:repositories

基于事件驱动架构构建微服务第9部分:处理更新

基于事件驱动架构构建微服务第19部分:使用 SignalR 和 Azure Active Directory 构建和保护实时通信...