分析Elasticsearch的Aggregation有感(一)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分析Elasticsearch的Aggregation有感(一)相关的知识,希望对你有一定的参考价值。

参考技术A 分析Elasticsearch的Aggregation有感(一)

Elasticsearch除了全文检索之外,引以为傲的便是各种聚合算法,如kibana中最为常用的以时间轴的刻度为桶bucket进行统计、累加等聚合计算。

分析Elasticsearch的聚合统计为什么能做到实时响应?其实原理相当简tong单,主要依赖fielddata(内存常驻,逐渐被doc_values取代),将需要排序的字段记录事先排好序记录在内存中或保存在节点磁盘上;加上集群分布式计算能力,能做到聚合计算极快。

桶中桶的聚合计算则是Elasticsearch的噩梦,经常在不怎么大的数据量下出现OOM;Elasticsearch所有的计算都采用以下方式;

当请求发现集群中的任何一个节点,该节点必须负责将数据进行整理汇聚,再返回给客户端,也就是一个简单的节点上进行Map计算,在一个固定的节点上进行Reduces得到最终Map集合向客户端返回。

当进行桶中桶计算时,Elasticsearch将该过程只进行了简单拆分,分别计算出两个聚合的桶的结果集,再进行两个结果集的Join。

也就是Hadoop进行Join计算时的Redece端进行Join计算,这时所汇聚的数据量以及Join计算时K(k1-k2)所产生的新的键值急速膨胀,最终导致汇聚节点的OOM。

Elasticsearch当前始终坚持采用简单的查询发起节点负责数据汇聚,跟Elasticsearch及搜索引擎技术的特点有关,即保证TopN的检索效率,每个计算节点只返回各自的TopN,再由汇聚节点整合计算出TopN,这样节点向汇聚节点所传输的数据始终较小;这样设计带来的问题是,无法进行复杂的计算,如桶中桶,Any Join等这些其他类型的数据仓库所具备的功能。

当前解决这个问题的办法,首先能想到的就是与分布式计算引擎来结合,复杂的计算交给分布式计算引擎来完成,所以自然出现了Elasticsearch-Hadoop的连接组件。但尝试使用过Elasticsearch-Hadoop的人最终都放弃了,原因是当前Hadoop与Elasticsearch结合时,仅仅把Elasticsearch当前类似Txt类型的存储,进行计算时Hadoop的Map任务通过Elasticsearch-Hadoop提供InputFormat,只是简单通过Elasticsearch的Scroll对数据进行全量的读取。

这里我们测试过一般硬件配置配置(32核,128G内存,3*4TB硬盘,千兆网卡)组成的4节点集群,最大的Scroll性能只能到20W-30W条/s(1k每条记录);简单计算下,当需要分析10亿级别的数据时,光数据从Elasticsearch集群加载到Hadoop集群所需要的时间是多少。

所以只是将Elasticsearch当作普通存储来进行两个集群的结合显然不合适,如何发挥两个集群各种的计算特点来适应各种不同的计算需求,下面来看看我们的研究方向:

修改Elasticsearch的底层计算逻辑,在进行复杂计算时,不是采用简单的计算任务拆分,下发计算,再汇聚这样粗暴的方式,而是类似Hadoop上的优化,在进行第一层节点计算后,中间在穿插一层shuffle过程,将需要进行Join计算的Maps,进行相应的排序,迁移评估,再执行迁移,保证数据在节点间最小迁移的情况下,再在迁移后的节点上进行Join,再进行Reduce,是不是已经晕了。所以暂时我们也没有计算对这部分进行如此彻底的修改。

        如何将Elastcicearch如何与Hadoop的有机结合,但不是如何提高scroll速度或Map任务直接对Lucese文件进行直接的IO等,将数据全量读取到Hadoop集群,而接下来的任何分析都与Elasticsearch没有任何关系的做法。

根据Elasticsearch数据shard的分布,设置Hadoop的Map任务,保持Map采用Local方式访问一个或多个分片,将Map操作的数据流控制在Local上。

public List getSplits(JobContext job) throws IOException

// getshards splits

List originalSplits =ElasticserchCatShards(job);

// Get active servers

String[] servers = getActiveServersList(job);

if (servers == null )

return null ;

// reassign splits to active servers

List splits = new ArrayList(originalSplits.size());

int numSplits = originalSplits.size();

int currentServer = 0;

for ( int i = 0; i < numSplits; i++, currentServer = i>getNextServer(currentServer, servers.length))

String server = servers[currentServer]; // Current server

boolean replaced = false ;

// For every remaining split

for (InputSplitsplit : originalSplits)

FileSplit fs = (FileSplit)split;

// For every split location

for (String l : fs.getLocations())

// If this split is local to the server

if (l.equals(server))

// Fix split location

splits.add( new FileSplit(fs.getPath(), fs.getStart(),

fs.getLength(), new String[] server));

originalSplits.remove(split);

replaced = true ;

break ;





if (replaced)

break ;



// If no local splits are found for this server

if (!replaced)

// Assign first available split to it

FileSplit fs = (FileSplit)splits.get(0);

splits.add( new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(),

new String[] server));

originalSplits.remove(0);





return splits;



对计算任务进行拆分,在进行底层数据Input时,采用的scroll+query方式,指定分片进行查询。

publicbooleannext(Kkey, Vvalue)throwsIOException

if(scrollQuery==null)

if(beat!=null)

beat.start();



#set querey shards and host:127.0.0.1

scrollQuery=queryBuilder.build(client, scrollReader,shards,host);

size=scrollQuery.getSize();

if(log.isTraceEnabled())

log.trace(String.format("Received scroll [%s], size [%d] for query [%s]", scrollQuery, size, queryBuilder));





booleanhasNext=scrollQuery.hasNext();

if(!hasNext)

returnfalse;



Object[] next=scrollQuery.next();

// NB: the left assignment is not needed since method override

// the writable content however for consistency, they are below

currentKey=setCurrentKey(key, next[0]);

currentValue=setCurrentValue(value, next[1]);

// keep on counting

read++;

returntrue;



          当然这样的做法,还是无法彻底解决单个shards数据量过大的情况下,单个Map任务加载速度过慢情况的出现。通过Demo测试,性能要较原生的Elasticsearch-Hadoop控件有50倍左右提升。

          研究的另一个方向是对doc_values数据文件的分析,doc_values文件的设计是解决fielddata占用内存过大,通过分析doc_value和fielddata,一个字段的数据进行排序存储在内存和磁盘,其不就是天生的列式存储么!采用将Map任务直接对Doc_value文件的读取加载,理论上是可以绕过Elasticsearch的计算节点的,需要我们小伙伴们加快研究步伐,解决Elastticsearch无法进行复杂计算的痛病,至少实现桶中桶,在进行soc分析经常被提及的需求。

->

如何改进Elasticsearch用于日志分析?

作者 | 黎吾平
整理 | CNUTCon 全球运维技术大会
Elasticsearch 本身的定位并不是一个日志分析系统。如何应用 Elasticsearch 做日志分析呢?

2018 年 10 月 6 日,Elastic 公司上市,这也是开源软件领域为数不多成功上市的公司。Elastic 公司以开源搜索产品 Elasticsearch 而闻名。

Elasticsearch 是一个分布式可扩展的实时搜索和分析引擎,被全球众多知名公司使用,有各种各样的场景用例。当应用在 Uber、Instacart 和 Tinder 时,它使骑手与附近的司机配对,为在线购物者提供相关的结果和建议,或者匹配他们可能喜欢的人;当应用在传统 IT、运营和安全部门时,它被用于聚合定价、报价和商业数据,每天处理十亿日志事件,为数千个设备和关键数据提供网络安全操作。

那么,Elasticsearch 和日志分析技术又能擦出怎样的火花呢?它本身的定位并不是一个日志分析系统,用于日志分析中,它能发挥什么作用?要做何优化和改进?市场上还有哪些开源搜索引擎?与它们相比,Elasticsearch 有哪些优缺点?InfoQ 采访到了日志易技术副总裁黎吾平,请他聊聊 Elasticsearch 与日志分析的二三事。

 InfoQ:日志分析技术可以解决什么问题?

黎吾平: 日志在计算机系统中是一个非常广泛的概念,任何程序都有可能输出日志:操作系统内核、各种应用服务器等。日志的内容、规模和用途也各不相同。这里仅列举几个场景。

在 Web 或者 App 中,很多时候采用日志来记录用户的访问行为。通过日志可对服务的访问量进行分析,对系统进行改进,对用户进行画像分析,后续可用于广告、推荐等业务等。

很多企业内部软件中,会记录用户的所有操作行为,这部分日志可用于用户审计或企业安全的目的。

此外,对于开发或者运维人员,查日志可以算得上是故障分析、监控告警等等场景的第一选择。

 InfoQ:日志分析有哪些方法?

黎吾平: 在数据规模比较小的情况下,可以使用单机的脚本进行处理,这种方法简单快速,但当对日志进行多种不同的分析的时候,可能会导致大量重复的代码,用于进行数据解析和清洗,这个时候可能使用更合适的方法——比如采用数据库。

使用数据库来进行日志分析,一个很重要的点就是如何将各种异构的日志文件导入到的数据库中,因为数据库首先需要按照固定格式创建表结构——这个过程通常称为 ETL。当数据导入后,可以采用大家熟悉的 SQL 进行日志的各种分析。

对于数据规模比较大的情况,一般会使用分布式技术。分布式的方式之一是采用 hadoop 存储日志数据,后续采用 MapReduce 的 job 或者 spark 等进行分析。如果希望采用 SQL 的方案来分析,可以采用 Hive 这些类似数据库的系统。这类系统适合于批量的进行分析。如果需要实时分析则需要引入一些实时的处理系统。

还有一种基于搜索引擎的方案日志的分析,目前国内就出现了很多基于 Elasticsearch 进行日志分析的用法,行业内也有一些商用的产品。如国外 Splunk、Sumo Logic,国内的日志易。

 InfoQ:日志分析中,大家最常遇到的问题有哪些?有什么解决方法?

黎吾平:日志分析最大的问题是字段抽取复杂。首先对所有日志进行字段抽取的工作量很大,其次日志会随着产品版本更新发生变化,而且日志入库时很难预知后续的分析对字段的需求。因此需要一个功能强大且易用的日志抽取功能,以及搜索时按需临时抽取字段的能力。

其次是需要灵活可配置的分析能力。 如果每次分析都需要写代码进行,工作量大而且使用的门槛比较高。对于数据库的方案可以采用 SQL,一些商用的产品都推出了自己的语言,比如 SPL(Search Processing Language)。

另外几个常见的问题就是实时性和性能问题。随着日志数据量的增长,想要实时监控分析数据,就必然会影响系统性能。这二者的平衡与优化,主要受运维工作经验的影响。

 InfoQ:在日志分析中搜索引擎的作用是什么?

黎吾平: 日志分析中的搜索引擎主要用于数据的读写:即实时的接收最新产生的日志数据,并进行索引,以实时或者准实时的方式提供给用户进行搜索和统计分析。

搜索引擎相比较一些类似数据库的系统,主要特点是:支持全文检索,搜索更快,处理数据的性能更强,实时性也非常好。

 InfoQ:日志分析中,常用的搜索引擎有哪些,Elasticsearch 有什么优点?

黎吾平: 开源搜索引擎项目主要有 Apache 社区的 Solr、雅虎开源的 Vespa、LinkedIn 开源的 Sensei 和 Elasticsearch 等。Elasticsearch 目前应该可以说是广泛应用于日志的分析,目前不少国内的公司,包括互联网的公司都在使用。

在日志分析领域,Elasticsearch 的优点主要表现为灵活易用、插件扩展、准实时搜索、具有一定的统计分析功能上。比如说,Elasticsearch 支持大量的 Aggregation、丰富的 Restful 接口。还有就是,Elasticsearch 社区活跃,很多问题能够在社区里快速解决。

 InfoQ:匹配日志分析场景,为什么需要对 Elasticsearch 进行改进?做了哪些方向的改进?

黎吾平: 首先,日志易一直以来是 ES 的深度用户。不少客户的数据量非常大,所部署的产品集群规模达到上百台机器,每天新产生的数据数十 TB,我们的产品大都是部署在用户的生产环境,相比较自己公司内部使用 ES,故障处理、功能调试要困难的多,所以我们对功能、性能、稳定性的要求是非常高的。

日志易对 Elasticsearch 进行主要自以下方面入手:

一是针对 Elasticsearch 功能、性能、稳定性的优化。Elasticsearch 实际使用过程中,出现的问题较多。比如 Elasticsearch 中不允许出现字段类型冲突。在索引时,Elasticsearch 要检查新的字段在整个集群内是否存在类型冲突,这会导致入库时数据堆积、集群卡死。这要求根据具体的使用场景,针对 ES 进行优化:有 Mapping 更新逻辑、增加长期索引存储、优化多存储路径时 Shard 分配逻辑等。此外,还改进优化了 Elasticsearch 调用的 Lucene 搜索库,涉及数百个源代码文件,过万行源代码。

二是日志搜索引擎的专业化。Elasticsearch 本身的定位是一个通用的搜索引擎,而不一个专用的日志分析系统,因此也有很多的功能并不是日志分析需要的。比如说,日志是不可变的,日志基本不需要相关性。事实上,因为 Elasticsearch 要面对更广泛的使用场景,之前提到的一些优点,背后是隐含着限制条件的。

日志易的日志搜索引擎,大幅提升了日志搜索的性能。这里列举一下搜索引擎主要的改进:

字段冲突的优化。Elasticsearch 是 schemaless,less 不是 no,实际上依然会有字段冲突的问题。这点在格式经常变动的日志数据上,问题非常显著。日志易在这块做了自己的处理。

统计分析和 RESTful 接口的改进。 可谓”成也萧何败也萧何”,Elasticsearch 所提供的统计分析和 RESTful 接口,一旦面临复杂的需求分析,用户可能就要陷入 JSON 地狱。当然,在这点上,日志易也做出了相应的处理。

实时性和性能也是大规模集群的改进优化重点。 我们重新开发的引擎在 Replica 策略、Segment 合并策略、消重策略、DocValues 统计上,都做了不同程度的改造优化。

想要共同探讨 Elasticsearch 优化经验的朋友,欢迎在 11 月份的上海,于 InfoQ 主办的 CNUTCon 全球运维技术大会日志处理专题论坛与我交流。

 InfoQ:近期,Elasticsearch 上市了,你怎么看待它的前景?

黎吾平:Elasticsearch 近期在纽交所上市,可以说是开源软件领域为数不多成功上市的公司,上市以后,Elasticsearch 在品牌影响力、资金实力、人才吸引上都会更进一步提升。随之而来是更大的投入,这肯定会提高搜索引擎在技术社区的影响力,也提高 IT 技术人员对基于搜索引擎的日志分析技术的认可。这个领域的未来应该会持续繁荣,我也期待有更多的新场景涌现出来。

以上是关于分析Elasticsearch的Aggregation有感(一)的主要内容,如果未能解决你的问题,请参考以下文章

显示所有 Elasticsearch 聚合结果/桶,而不仅仅是 10 个

Elasticsearch源码分析-索引分析(一)

Elasticsearch Analyzer详解

ElasticSearch:分析器

如何改进Elasticsearch用于日志分析?

ElasticsearchNori:官方的韩语分析插件Elasticsearch