SPARK的计算向量化-spark本身的向量化

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK的计算向量化-spark本身的向量化相关的知识,希望对你有一定的参考价值。

背景

我们知道,随着计算引擎战争的结束(SPARK赢得了离线处理的霸权),越来越多的公司致力于性能的优化,而引擎的优化,目前直指计算的向量化,
这片文章来说说spark本身对于向量化的实现。

spark本身的优化

我们都知道spark的Tungsten项目,这个项目中有一点就是Code Generation(代码生成)。代码生成除了消除虚函数的调用等功能外,其实在向量化这块也是做了处理的。
直接跳到ColumnarToRowExec代码:

val columnarBatchClz = classOf[ColumnarBatch].getName
    val batch = ctx.addMutableState(columnarBatchClz, "batch")
  ...
 val localIdx = ctx.freshName("localIdx")
    val localEnd = ctx.freshName("localEnd")
    val numRows = ctx.freshName("numRows")
    val shouldStop = if (parent.needStopCheck) 
      s"if (shouldStop())  $idx = $rowidx + 1; return; "
     else 
      "// shouldStop check is eliminated"
    
    s"""
       |if ($batch == null) 
       |  $nextBatchFuncName();
       |
       |while ($limitNotReachedCond $batch != null) 
       |  int $numRows = $batch.numRows();
       |  int $localEnd = $numRows - $idx;
       |  for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) 
       |    int $rowidx = $idx + $localIdx;
       |    $consume(ctx, columnsBatchInput).trim
       |    $shouldStop
       |  
       |  $idx = $numRows;
       |  $batch = null;
       |  $nextBatchFuncName();
       |
     """.stripMargin

spark中向量化的核心就在于这块代码中,这块代码主要的就是ColumnarBatch,也就是列批,这种列批的数据结构,用FOR循环这种方式进行数据的访问,
这在JIT中会进行优化(优化成向量化)。
而这里还有一个重点就是:Parquet或者ORC这种列式存储,读取出来的时候,天然就是一个列批的数据结构,很方便做向量化操作。

但是,利用JIT进行向量化是有缺点的:
利用了JIT进行优化,这个是需要编译器追踪循环的次数的,如果循环次数不够,就不会进行进行JIT,也就无法做到向量化。
所以好多公司把这种着力于用其他语句实现来进行真正意义上的向量化。

参考

本文参考了

  1. 深度解读|Spark 中 CodeGen 与向量化技术的研究
  2. Velox: 现代化的向量化执行引擎

以上是关于SPARK的计算向量化-spark本身的向量化的主要内容,如果未能解决你的问题,请参考以下文章

SPARK Parquet嵌套类型的向量化支持以及列索引(column index)

SPARK Parquet嵌套类型的向量化支持以及列索引(column index)

Spark / Hive / ClickHouse 向量化查询执行原理分析(Vectorization Query Execution)

一文了解 ClickHouse 的向量化执行

我们是不是需要 C++ 中的向量化或 for 循环已经足够快?

Apache Doris 向量化设计与实现