第20篇-不和谐如何索引数十亿条消息

Posted elasticsearchalgolia

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第20篇-不和谐如何索引数十亿条消息相关的知识,希望对你有一定的参考价值。

我的Elasticsearch系列文章,逐渐更新中,欢迎关注
0A.关于Elasticsearch及实例应用
00.Solr与ElasticSearch对比
01.ElasticSearch能做什么?
02.Elastic Stack功能介绍
03.如何安装与设置Elasticsearch API
04.如果通过elasticsearch的head插件建立索引_CRUD操作
05.Elasticsearch多个实例和head plugin使用介绍

06.当Elasticsearch进行文档索引时,它是怎样工作的?

07.Elasticsearch中的映射方式—简洁版教程

08.Elasticsearch中的分析和分析器应用方式

09. Elasticsearch中构建自定义分析器

10.Kibana科普-作为Elasticsearhc开发工具
11.Elasticsearch查询方法

12.Elasticsearch全文查询

13.Elasticsearch查询-术语级查询

14.Python中的Elasticsearch入门

15.使用Django进行ElasticSearch的简单方法

16.关于Elasticsearch的6件不太明显的事情
17.使用Python的初学者Elasticsearch教程
18.用ElasticSearch索引MongoDB,一个简单的自动完成索引项目
19.Kibana对Elasticsearch的实用介绍
20.不和谐如何索引数十亿条消息
21.使用Django进行ElasticSearch的简单方法

另外Elasticsearch入门,我强烈推荐ElasticSearch新手搭建手册和这篇优秀的REST API设计指南 给你,这两个指南都是非常想尽的入门手册。

 

每月有数百万用户在Discord上发送数十亿条消息。一种搜索历史记录的方法迅速成为我们构建的最受欢迎的功能之一。让我们搜索吧!
要求
● 经济高效: Discord的核心用户体验是我们的文本和语音聊天。搜索是一项辅助功能,而反映这一功能所需的基础架构价格。理想情况下,这意味着搜索的费用不应超过消息的实际存储量。
● 快速直观:我们构建的所有功能都必须快速直观,包括搜索。我们产品的搜索体验也需要看起来和使用起来很棒。
● 自我修复:我们还没有一支专门的devop小组(因此),因此搜索需要能够以最少的操作员干预或完全没有操作员的干预来容忍失败。
● 线性可扩展:就像我们存储消息的方式一样,增加搜索基础结构的容量应涉及添加更多节点。
● 懒惰地索引:并非所有人都使用搜索-我们不应该对消息建立索引,除非有人尝试至少搜索一次。此外,如果索引失败,我们需要能够动态地重新索引服务器。

在查看这些要求时,我们向自己提出了两个关键问题:
问:我们可以将搜索外包给托管的SaaS吗?(简易模式)
A.不。我们研究过的每一项解决方案都进行了托管搜索,这会浪费我们的预算(天文数字很高)。此外,将消息从我们的数据中心中发送出去的想法与团队并不协调。作为一个注重安全的团队,我们希望控制用户消息的安全性,而不是让第三方知道他们在做什么。
问:是否存在可以使用的开源搜索解决方案?
答:是的!我们环顾四周,内部很快就开始讨论Elasticsearch vs Solr,因为两者都适合我们的用例。Elasticsearch具有优势:
● Solr上的节点发现需要ZooKeeper。我们运行etcd,并且不想拥有专门用于Solr的其他基础结构。Elasticsearch的Zen Discovery自成一体。
● Elasticsearch支持自动分片重新平衡,这将使我们能够向集群添加新节点,从而满足开箱即用的线性可扩展性要求。
● Elasticsearch具有内置的结构化查询DSL,而您必须使用第三方库以Solr编程方式创建查询字符串。
● 团队的工程师拥有更多与Elasticsearch合作的经验
Elasticsearch可以工作吗?
Elasticsearch似乎具备了我们想要的一切,并且我们的工程师在过去曾有过使用它的经验。它提供了一种跨不同节点复制数据的方法,以容忍单个节点的故障,通过添加更多节点来扩展群集,并可以吸收要索引的消息而不会费劲。到处阅读,我们听到了一些有关管理大型Elasticsearch集群的恐怖故事,实际上,除了日志记录基础架构之外,我们的后端团队都没有任何管理Elasticsearch集群的经验。
我们想避免这些繁琐的大型集群,因此我们想到了将分片和路由委托给应用程序层的想法,使我们可以将消息索引到较小的Elasticsearch集群池中。这意味着在群集中断的情况下,仅受影响的群集上包含的Discord消息将不可搜索。这还为我们提供了以下优势:如果无法恢复整个群集的数据,则可以丢弃整个群集的数据(系统可以在用户下次执行搜索时懒惰地重新索引Discord服务器)。
组成部分
当文档被大量索引时,Elasticsearch喜欢它。这意味着我们无法为实时发布的消息编制索引。取而代之的是,我们设计了一个队列,其中工作人员在单个批量操作中抓取一堆消息并将它们编入索引。我们认为,从发布消息到可搜索消息之间的微小延迟是一个完全合理的约束。毕竟,大多数用户搜索的都是历史记录而不是刚才所说的消息。

在摄取方面,我们需要一些注意事项:
● 消息队列:我们需要一个队列,我们??可以在消息实时发布时将其放入(供工作人员使用)。
● 索引工作人员:执行实际路由和批量插入的工作人员从队列插入Elasticsearch。
我们已经在Celery之上构建了一个任务排队系统,因此我们也将其用于历史索引工作者。
● 历史索引工作人员:负责在给定服务器中遍历消息历史并将其插入到Elasticsearch索引中的工作人员。
我们还需要快速,轻松地映射Discord服务器的消息将驻留在哪个Elasticsearch集群上并建立索引。我们将此“群集+索引”对称为碎片(不要与索引中的Elasticsearch的本地碎片混淆)。我们创建的映射分为两层:
● 持久性碎片映射:我们将其放在Cassandra上,这是持久性数据的主要数据存储,是事实的来源。
● 分片映射缓存:当我们在工作人员上接收消息时,向Cassandra查询分片是一个很慢的操作。我们将这些映射缓存在Redis中,以便我们可以执行mget操作来快速确定需要将消息路由到的位置。

首次为服务器建立索引时,我们还需要一种方法来选择用于保留Discord服务器消息的碎片。由于分片是应用程序分层的抽象,因此我们可以对如何分配它们有所了解。通过利用Redis的功能,我们使用了排序集来构建负载感知的分片分配器。
● 分片分配器:在Redis中使用排序集,我们保留了一组分片,其得分代表其负荷。得分最低的分片是接下来应该分配的分片。分数随着每次新分配而增加,并且在Elasticsearch中索引的每条消息也都有可能增加其Shard的分数。随着分片中获得更多数据,它们被分配给新Discord服务器的可能性就较小。
当然,如果没有从应用程序层发现集群及其中的主机的方法,那么整个搜索基础架构将是不完整的。
● etcd:我们在系统的其他部分中使用etcd进行服务发现,因此我们也将其用于Elasticsearch集群。由于集群中的节点可以将自己声明到etcd上,以供系统其余部分查看,因此我们不必对任何Elasticsearch拓扑进行硬编码。
最后,我们需要一种让客户能够实际搜索事物的方法。
● 搜索API:客户端可以向其发出搜索查询的API端点。它需要进行所有权限检查,以确保客户端仅搜索他们实际有权访问的消息。

索引和映射数据
在非常高的层次上,在Elasticsearch中,我们有一个“索引”的概念,其中包含许多“碎片”。在这种情况下,分片实际上是Lucene索引。Elasticsearch负责将索引内的数据分发到属于该索引的分片。如果需要,可以使用“路由键”控制数据在分片之间的分配方式。索引也可以包含“复制因子”,即索引(及其中的分片)应复制到的节点数。如果索引所在的节点发生故障,则副本可以接管(不相关但相关,这些副本也可以用于搜索查询,因此您可以通过添加更多副本来扩展索引的搜索吞吐量)。
由于我们在应用程序级别(我们的分片)中处理了所有分片逻辑,因此让Elasticsearch为我们进行分片实际上没有任何意义。但是,我们可以使用它在集群中的节点之间进行索引的复制和平衡。为了让Elasticsearch使用正确的配置自动创建索引,我们使用了索引模板,其中包含索引配置和数据映射。索引配置非常简单:
● 索引只能包含一个分片(不要为我们做任何分片)
● 索引应复制到一个节点(能够容忍索引所在的主节点的故障)
● 索引每60分钟应刷新一次(为什么要这样做,下面将进行说明)。

索引包含一个文档类型:
message

将原始消息数据存储在Elasticsearch中几乎没有意义,因为数据的格式不是易于搜索的格式。相反,我们决定采用每条消息,并将其转换为一堆字段,其中包含有关消息的元数据,我们可以对其进行索引和搜索:

您会注意到,我们没有在这些字段中包含时间戳,并且如果您从我们以前的博客文章中回忆起,我们的ID是Snowflakes,这意味着它们固有地包含时间戳(我们可以在之前,之后和之后使用它来加电)使用最小和最大ID范围进行查询)。

但是,这些字段实际上并没有“存储”在Elasticsearch中,而是仅存储在反向索引中。实际存储和返回的唯一字段是张贴消息的消息,通道和服务器ID。这意味着消息数据在Elasticsearch中不会重复。折衷是,我们必须在返回搜索结果时从Cassandra获取消息,这是完全可以的,因为我们必须从Cassandra中提取消息上下文(前后2条消息)以始终为UI供电。将实际的消息对象保留在Elasticsearch之外意味着我们不必为存储它而额外的磁盘空间。但是,这意味着我们无法使用Elasticsearch突出显示搜索结果中的匹配项。我们必须将标记生成器和语言分析器内置到我们的客户端中以进行突出显示(这确实很容易做到)。

实际编码
我们认为可能不需要微服务来搜索,而是向Elasticsearch公开了一个封装了路由和查询逻辑的库。我们唯一需要运行的附加服务是索引工作程序(它将使用此库来执行实际的索引工作)。暴露给团队其他成员的API表面积也很小,因此,如果确实需要将其转移到它自己的服务中,则可以轻松地将其包装在RPC层中。该库也可以由我们的API工作者导入,以实际执行搜索查询并通过HTTP将结果返回给用户。

对于团队的其他成员,该库暴露了用于搜索消息的最小表面积:
排队要编制索引或删除的消息:

批量索引工作人员中的实时消息(大致):

为了对服务器的历史消息建立索引,一个历史索引作业将执行一个工作单元,并返回继续运行该服务器所需的下一个作业。每个作业代表进入服务器消息历史记录和固定执行单位的光标(在这种情况下,默认值为500条消息)。作业将新游标返回到要索引的下一批消息,如果没有更多工作要做,则返回“无”。为了快速返回大型服务器的结果,我们将历史索引分为两个阶段,即“初始”阶段和“深度”阶段。“初始”阶段为服务器上最近7天的邮件编制索引,并使索引可供用户使用。之后,我们在“深层”阶段对整个历史进行索引,该阶段以较低的优先级执行。本文显示给用户的外观。这些作业在一组芹菜工作者中执行,从而可以在这些工作者执行的其他任务中安排这些工作。大致如下所示:

在生产中进行测试

在对此进行编码并在我们的开发环境上对其进行测试之后,我们决定是时候看看它在生产中的性能了。我们创建了一个包含3个节点的单个Elasticsearch集群,配置了索引工作器,并计划对1,000个最大的Discord服务器进行索引。一切似乎都正常,但是在查看集群中的指标时,我们注意到了两件事:

CPU使用率高于预期。
磁盘使用率增长得太快了,无法索引大量消息。
我们很困惑,在让它运行了一段时间并用完了太多的磁盘空间之后,我们取消了索引作业,并将其命名为通宵。不太正确。
第二天早上回来时,我们注意到磁盘使用量减少了很多。Elasticsearch是否丢弃了我们的数据?我们尝试在我们索引其中一台服务器所在的一台服务器上发出搜索查询。结果返回的很好-而且速度也很快!是什么赋予了?

磁盘使用率快速增长然后逐渐减少

CPU使用率
经过研究后,我们提出了一个假设!默认情况下,Elasticsearch的索引刷新间隔设置为1秒。这就是在Elasticsearch中提供“近实时”搜索功能的原因。每隔一千个索引(跨越一千个索引),Elasticsearch会将内存缓冲区刷新到Lucene段,并打开该段使其可搜索。一整夜,Elasticsearch在空闲时将其生成的大量细小段合并为磁盘上更大(但更节省空间)的段。
测试这一点非常简单:我们将所有索引都放在了集群上,将刷新间隔设置为任意大的数字,然后我们计划对同一服务器进行索引。提取文档时,CPU使用率几乎降为零,并且磁盘使用率没有以惊人的速度增长。晕!
减少刷新间隔后的磁盘使用率

CPU使用率

但是,不幸的是,实际上,关闭刷新间隔是无效的……
刷新困境
显而易见,Elasticsearch的自动近实时索引可用性无法满足我们的需求。可能服务器无需执行单个搜索查询就可以运行数小时。我们需要建立一种方法来控制应用程序层的刷新。我们通过Redis中过期的hashmap做到了这一点。假设Discord上的服务器已在Elasticsearch上共享为共享索引,我们可以构建一个快速映射,该索引随索引一起更新,跟踪是否需要刷新索引(给定要搜索的服务器)。数据结构很简单:存储哈希图的Redis密钥
prefix + shard_key到标记guild_id

值的哈希图,表示需要刷新。回想起来,这可能是一个集合。
因此,索引生命周期变为:
从队列中提取N条消息。
找出这些消息应由其路由到何处guild_id
对相关集群执行批量插入操作。
更新Redis映射,表示该碎片和该碎片中的给定guild_id

s现在已变脏。1小时后使该密钥过期(因为此时Elasticsearch会自动刷新)。
搜索生命周期变成:
如果脏了,请刷新碎片的Elasticsearch索引,并将整个碎片标记为干净。
执行搜索查询并返回结果。
您可能已经注意到,即使我们现在已经在Elasticsearch上显式控制了刷新逻辑,我们仍然让它每小时自动刷新基础索引。如果在我们的Redis映射上发生数据丢失,则系统最多需要一个小时才能自动更正自身。
未来
自1月份部署以来,我们的Elasticsearch基础架构已扩展到2个集群中的14个节点,使用GCP上的n1-standard-8实例类型,每个实例类型具有1TB的Provisioned SSD。文件总数约为260亿。索引速率达到峰值,约为每秒30,000条消息。Elasticsearch毫不费力地处理了它-在我们推出搜索的整个过程中,CPU保持在5-15%。

到目前为止,我们已经能够轻松地向集群添加更多节点。在某个时候,我们将启动更多集群,以便新的Discord服务器被索引到它们上(这要归功于我们的加权分片分发系统)。在我们现有的集群上,随着向集群中添加更多数据节点,我们将需要限制主合格节点的数量。
我们还偶然发现了4个主要指标,用于确定何时需要增长集群:

heap_free :(又名heap_committed — heap_used)当我们用完了可用的堆空间时,JVM被迫执行一个完整的世界各地的GC来快速回收空间。如果无法回收足够的空间,则该节点将崩溃并燃烧。在此之前,JVM将进入一种状态,在这种状态下,随着堆已满,并且在每个完整的GC期间释放的内存太少,JVM会不断地执行世界范围内的GC。我们将其与GC统计信息一起查看,以了解垃圾回收花费了多少时间。
disk_free:显然,当我们用完磁盘空间时,我们需要添加更多节点或更多磁盘空间来处理被索引的新文档。在GCP上,这非常容易,因为我们可以增加磁盘的大小而无需重新启动实例。选择添加新节点还是调整磁盘大小取决于此处提到的其他指标的外观。例如,如果磁盘使用率很高,但其他指标处于可接受的水平,则我们将选择添加更多的磁盘空间而不是新节点。
cpu_usage:如果我们在高峰时段达到CPU使用量的阈值。
io_wait:如果集群上的IO操作变得太慢。
不健康的群集(堆满)
无堆(MiB)
耗用时间GC / s
健康集群
无堆(GiB)
耗用时间GC / s

结论
自我们启动搜索功能以来,距离现在已经有三个多月了,到目前为止,该系统几乎没有遇到任何问题。

Elasticsearch在大约16,000个索引和数百万个Discord服务器中显示了从0到260亿个文档的稳定一致的性能。我们将继续通过向现有集群添加更多集群或更多节点来扩展规模。在某个时候,我们可能会考虑编写代码,使我们能够在群集之间迁移索引,从而减轻群集负载,或者如果Discord服务器是特别健谈的服务器,则可以为Discord服务器提供自己的索引(尽管我们的加权分片系统做得很好确保大型Discord服务器当前通常拥有自己的碎片)。 

 

以上是关于第20篇-不和谐如何索引数十亿条消息的主要内容,如果未能解决你的问题,请参考以下文章

如何使用mongo在sinatra的handsontable中轻松加载数十亿条记录?

如何在不和谐 js 中使用消息获取?

处理数十亿条记录的推荐数据库类型

Kafka不停机,如何无感知迁移ZooKeeper集群?

将 s3 中跨 CSV 文件的数十亿条记录推送到 MongoDb

Firebase 实时数据库能否有效地循环浏览数十亿条帖子并由发布它们的用户检索?