es使用与原理6 -- 聚合分析剖析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了es使用与原理6 -- 聚合分析剖析相关的知识,希望对你有一定的参考价值。

参考技术A

有些聚合分析的算法,是很容易就可以并行的,比如说max

有些聚合分析的算法,是不好并行的,比如说,count(distinct),并不是说,在每个node上,直接就出一些distinct value,就可以的,因为数据可能会很多,假设图中的协调节点3百万个数据去重后还剩下100万distinct的数据,那么内存需要来存储这100万条数据,这是不可能的

es会采取近似聚合的方式,就是采用在每个node上进行近估计的方式,得到最终的结论,cuont(distcint),100万,1050万/95万 --> 5%左右的错误率
近似估计后的结果,不完全准确,但是速度会很快,一般会达到完全精准的算法的性能的数十倍

precision_threshold优化准确率和内存开销

brand去重,如果brand的unique value,在100个以内,小米,长虹,三星,TCL,HTL。。。
在多少个unique value以内,cardinality,几乎保证100%准确
cardinality算法,会占用precision_threshold * 8 byte 内存消耗,100 * 8 = 800个字节
占用内存很小。。。而且unique value如果的确在值以内,那么可以确保100%准确
100,数百万的unique value,错误率在5%以内
precision_threshold,值设置的越大,占用内存越大,1000 * 8 = 8000 / 1000 = 8KB,可以确保更多unique value的场景下,100%的准确
field,去重,count,这时候,unique value,10000,precision_threshold=10000,10000 * 8 = 80000个byte,80KB

doc value正排索引
搜索+聚合 是怎么实现的?
假设是倒排索引实现的

倒排索引来实现是非常不现实的,因为我们搜索的那个字段search_field 有可能是分词的,这就需要去扫描整个索引才能实现聚合操作,效率是及其低下的。
正排索引结构:
doc2: agg1
doc3: agg2
1万个doc --> 搜 -> 可能跟搜索到10000次,就搜索完了,就找到了1万个doc的聚合field的所有值了,然后就可以执行分组聚合操作了
doc value原理

1、doc value原理

(1)index-time生成

PUT/POST的时候,就会生成doc value数据,也就是正排索引

(2)核心原理与倒排索引类似

正排索引,也会写入磁盘文件中,然后呢,os cache先进行缓存,以提升访问doc value正排索引的性能
如果os cache内存大小不足够放得下整个正排索引,doc value,就会将doc value的数据写入磁盘文件中

(3)性能问题:给jvm更少内存,64g服务器,给jvm最多16g

es官方是建议,es大量是基于os cache来进行缓存和提升性能的,不建议用jvm内存来进行缓存,那样会导致一定的gc开销和oom问题
给jvm更少的内存,给os cache更大的内存
64g服务器,给jvm最多16g,几十个g的内存给os cache
os cache可以提升doc value和倒排索引的缓存和查询效率

2、column压缩

doc1: 550
doc2: 550
doc3: 500

合并相同值,550,doc1和doc2都保留一个550的标识即可
(1)所有值相同,直接保留单值
(2)少于256个值,使用table encoding模式:一种压缩方式
(3)大于256个值,看有没有最大公约数,有就除以最大公约数,然后保留这个最大公约数

重点:
对分词的field,直接执行聚合操作,会报错,大概意思是说,你必须要打开fielddata,然后将正排索引数据加载到内存中,才可以对分词的field执行聚合操作,而且会消耗很大的内存
先修改 字段的fielddata属性为true,再查 就能查找到数据

当然,我们也可以使用内置field(keyword)不分词,对string field进行聚合,如果对不分词的field执行聚合操作,直接就可以执行,不需要设置fieldata=true

分词field+fielddata的工作原理

doc value --> 不分词的所有field,可以执行聚合操作 --> 如果你的某个field不分词,那么在index-time,就会自动生成doc value --> 针对这些不分词的field执行聚合操作的时候,自动就会用doc value来执行
分词field,是没有doc value的。。。在index-time,如果某个field是分词的,那么是不会给它建立doc value正排索引的,因为分词后,占用的空间过于大,所以默认是不支持分词field进行聚合的
分词field默认没有doc value,所以直接对分词field执行聚合操作,是会报错的

对于分词field,必须打开和使用fielddata,完全存在于纯内存中。。。结构和doc value类似。。。如果是ngram或者是大量term,那么必将占用大量的内存。。。

如果一定要对分词的field执行聚合,那么必须将fielddata=true,然后es就会在执行聚合操作的时候,现场将field对应的数据,建立一份fielddata正排索引,fielddata正排索引的结构跟doc value是类似的,
但是只会讲fielddata正排索引加载到内存中来,然后基于内存中的fielddata正排索引执行分词field的聚合操作

如果直接对分词field执行聚合,报错,才会让我们开启fielddata=true,告诉我们,会将fielddata uninverted index,正排索引,加载到内存,会耗费内存空间

为什么fielddata必须在内存?因为大家自己思考一下,分词的字符串,需要按照term进行聚合,需要执行更加复杂的算法和操作,如果基于磁盘和os cache,那么性能会很差

我们是不是可以预先生成加载fielddata到内存中来???
query-time的fielddata生成和加载到内存,变为index-time,建立倒排索引的时候,会同步生成fielddata并且加载到内存中来,这样的话,对分词field的聚合性能当然会大幅度增强

Spark实战_SparkContext原理剖析与源码分析

TaskScheduler的初始化机制

TaskScheduler,如何注册Application,executor如何反向注册?

TaskScheduler的初始化机制
  1. createTaskScheduler(),内部会创建三个东西。

  2. 一是TaskSchedulerImpl,它其实就是我们所说的TaskScheduler。

  3. 二是SparkDeploySchedulerBackend,它在底层会负责接收TaskSchedulerImpl的控制,实际上负责与Master的注册,Ececutor的反注册,task发送到executor等操作。

  4. 调用TaskSchedulerImpl的init()方法,创建SchedulerPool,当DAGScheduler要让TaskScheduler去调度一些任务的时候,就会把这些任务放到调度池里面,它有不同的优先策略,比如FIFO。

  5. 调用TaskSchedulerImpl的start()方法,方法内部调用SparkDeploySchedulerBackend的start()方法。

  6. SparkDeploySchedulerBackend的start()方法,创建一个东西,AppClient。

  7. AppClient,启动一个线程,创建一个ClientActor。

  8. ClientActor线程,调用两个方法,registerWithMaster()——>tryRegisterAllMasters()。

  9. registerWithMaster()——>tryRegisterAllMasters(),向MasterActor发送RegisterApplication(case class,里面封装了Application的信息)。

  10. RegisterApplication发送数据到Spark集群的Master——>Worker——>Executor。

  11. Executor反向注册到SparkDeploySchedulerBackend上面去。

TaskSchedulerImpl底层实际主要基于SparkDeploySchedulerBackend来工作。

DAGScheduler

DAGSchedulerEventProcessActor,DAGScheduler底层基于该组件进行通信。(线程)

SprkUI

SprkUI,显示Application运行的状态,启动一个jetty服务器,来提供web服务,从而显示网页。

源码分析

package org.apache.spark,SparkContext.scala

// Create and start the scheduler
 private[spark] var (schedulerBackend, taskScheduler) =
   SparkContext.createTaskScheduler(this, master)
// 这是我们常用的Sparkt提交模式中的standalone方式
     case SPARK_REGEX(sparkUrl) =>
       val scheduler = new TaskSchedulerImpl(sc)
       val masterUrls = sparkUrl.split(",").map("spark://" + _)
       val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
       scheduler.initialize(backend)
       (backend, scheduler)

package org.apache.spark.scheduler,TaskSchedulerImpl.scala

/**
 * 1、底层通过操作一个SchedulerBackend,针对不同种类的cluster(standlalone、yarn、mesos),调度task
 * 2、它也可以通过使用一个LacalBackend,并且将isLocal参数设置为true,来在本地模式下工作
 * 3、它负责处理一些通用的逻辑,比如说决定多个job的调度顺序,启动推测任务执行
 * 4、客户端首先应该调用它的initialize()方法和start()方法,然后通过runTasks()方法提交task sets
 */

def initialize(backend: SchedulerBackend) {
   this.backend = backend
   // temporarily set rootPool name to empty
   rootPool = new Pool("", schedulingMode, 0, 0)
   schedulableBuilder = {
     schedulingMode match {
       case SchedulingMode.FIFO =>
         new FIFOSchedulableBuilder(rootPool)
       case SchedulingMode.FAIR =>
         new FairSchedulableBuilder(rootPool, conf)
     }
   }
   schedulableBuilder.buildPools()
 }

start()方法,

// start()方法,sparkContext.scala
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
 // constructor
 taskScheduler.start()
// TaskSchedulerImpl.scala
override def start() {
   backend.start()
// SparkDeploySchedulerBackend.scala    
override def start() {
   super.start()
// 这个ApplicationDescription,非常重要
   // 它就代表了当前执行的这个application的一些情况
   // 包括application最大需要多少cpu core,每个slave上需要多少内存
   val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
     appUIAddress, sc.eventLogDir, sc.eventLogCodec)
   // 创建了AppClient
   client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
   client.start()

package org.apache.spark.deploy.client,AppClient.scala,

/**
 * 这是一个接口
 * 它负责为application与Spark集群进行通信
 * 它会接收一个spark master的url,以及一个ApplicationDescripition,和一个集群事件的监听器,以及各种事件发生时,
 * 监听器的回调函数
 */

def start() {
   // Just launch an actor; it will call back into the listener.
   actor = actorSystem.actorOf(Props(new ClientActor))
 }

package org.apache.spark.scheduler,DAGScheduler

@volatile private[spark] var dagScheduler: DAGScheduler = _
 try {
   dagScheduler = new DAGScheduler(this)
 } catch {
   case e: Exception => {
     try {
       stop()
     } finally {
       throw new SparkException("Error while constructing DAGScheduler", e)
     }
   }
 }
/**
 * 实现了面向stage的调度机制的高层次的调度层。它会为每个job计算一个stage的DAG(有向无环图),
 * 追踪RDD和stage的输出是否被物化了(物化就是写入了磁盘或内存等地方),并且寻找一个最少消耗(最优、最小)调度机制来运行job。
 * 它会将stage作为tasksets提交到底层的TaskSchedulerImpl上,来在集群上运行它们(task)。
 *
 * 除了stage的DAG,它还负责决定运行每个task的最佳位置,基于当前的缓存状态,将这些最佳位置提交给底层的TaskSchedulerImpl。
 * 此外,它还会处理由于shuffle输出文件丢失导致的失败,在这种情况下,旧的stage可能会被重新提交。
 * 一个stage内部的失败,如果不是由于shuffle文件丢失所导致的,会被TaskScheduler处,它会多次重试每一个task,直到最后,实在不行了,
 * 才会去取消整个stage。
 */

the Spark UI,

// Initialize the Spark UI
 private[spark] val ui: Option[SparkUI] =
   if (conf.getBoolean("spark.ui.enabled", true)) {
     Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
       env.securityManager,appName))
   } else {
     // For tests, do not enable the UI
     None
   }

本文首发于steem,感谢阅读,转载请注明。

https://steemit.com/@padluo


数据分析

读者交流电报群

https://t.me/sspadluo


知识星球交流群

知识星球读者交流群


以上是关于es使用与原理6 -- 聚合分析剖析的主要内容,如果未能解决你的问题,请参考以下文章

ES聚合分析(聚合分析简介指标聚合桶聚合)

ES的常用查询与聚合

ELK技术栈ElasticSearch,Logstash,Kibana

跟我学Elasticsearch(7) es的嵌套聚合,下钻分析,聚合分析

ES学习--嵌套聚合下钻分析聚合分析

elasticsearch系列六:聚合分析(聚合分析简介指标聚合桶聚合)