SPARK最新特性Runtime Filtering(运行时过滤)以及与动态分区裁剪的区别

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK最新特性Runtime Filtering(运行时过滤)以及与动态分区裁剪的区别相关的知识,希望对你有一定的参考价值。

背景

本文基于 SPARK 3.3.0
在最新发布的SPARK RELEASE,第一个显著的特性就是row-level Runtime Filtering,我们来分析一下

分析

直接转到对应的 Jira SPARK-32268,里面涉及到的TPC benchmark,在数据行数比较大的情况下,BloomFilter带来的性能提升还是很明显的,最重要的设计文档在Row-level Runtime Filters in Spark,
里面讲了两个关键点:

  • Runtime Filter涉及两种模式,一种是In Filter(In Filter会转换为Semi Join),一种是Bloom Filter
  • Bloom Filter的参数控制 n_items(多少行数据),n_bits(多个位来标识)

代码实现

该功能涉及到两个主要的Rule InjectRuntimeFilter和 RewritePredicateSubquery

      Batch("InjectRuntimeFilter", FixedPoint(1),
      InjectRuntimeFilter,
      RewritePredicateSubquery) :+

第一个规则InjectRuntimeFilter,会根据spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabledspark.sql.optimizer.runtime.bloomFilter.enabled 配置项来开启是否进行RunTime Filter的转换(默认情况下是关闭的)。

private def tryInjectRuntimeFilter(plan: LogicalPlan): LogicalPlan = 
    var filterCounter = 0
    val numFilterThreshold = conf.getConf(SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD)
    plan transformUp 
      case join @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, hint) =>
        var newLeft = left
        var newRight = right
        (leftKeys, rightKeys).zipped.foreach((l, r) => 
          // Check if:
          // 1. There is already a DPP filter on the key
          // 2. There is already a runtime filter (Bloom filter or IN subquery) on the key
          // 3. The keys are simple cheap expressions
          if (filterCounter < numFilterThreshold &&
            !hasDynamicPruningSubquery(left, right, l, r) &&
            !hasRuntimeFilter(newLeft, newRight, l, r) &&
            isSimpleExpression(l) && isSimpleExpression(r)) 
            val oldLeft = newLeft
            val oldRight = newRight
            if (canPruneLeft(joinType) && filteringHasBenefit(left, right, l, hint)) 
              newLeft = injectFilter(l, newLeft, r, right)
            
            // Did we actually inject on the left? If not, try on the right
            if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
              filteringHasBenefit(right, left, r, hint)) 
              newRight = injectFilter(r, newRight, l, left)
            
            if (!newLeft.fastEquals(oldLeft) || !newRight.fastEquals(oldRight)) 
              filterCounter = filterCounter + 1
            
          
        )
        join.withNewChildren(Seq(newLeft, newRight))
    
  

适用RunTime Filter得有一下几个前提:

  • 没超过最大插入Runtime Filter的阈值,spark.sql.optimizer.runtimeFilter.number.threshold(
    默认是10)
  • 逻辑计划没有插入动态分区,没有插入Runtime Filter以及该key仅仅是简单的join条件(先做简单的)
  • 应用于Filter的join的一方能够通过join和aggerate 下推
  • Filter创建的join一方有一个可选择行的操作(用来过滤应用于Filter一方的数据)
  • 应用于Filter的join的一方的scan的文件大小必须大于某个阈值spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold默认是10GB(这样才能达到好的过滤效果)
  • 要么当前的是一个shuffle Join或者是一个存在shuffle的broadcast join只有在这些情况下才能够插入Filter进行过滤)
    注意:最后一个条件,单从代码上是看不出来为什么是broadcast join的,难道hash join不行么?.
    下面的分析会解释:
  private def injectFilter(
      filterApplicationSideExp: Expression,
      filterApplicationSidePlan: LogicalPlan,
      filterCreationSideExp: Expression,
      filterCreationSidePlan: LogicalPlan): LogicalPlan = 
    require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled)
    if (conf.runtimeFilterBloomFilterEnabled) 
      injectBloomFilter(
        filterApplicationSideExp,
        filterApplicationSidePlan,
        filterCreationSideExp,
        filterCreationSidePlan
      )
     else 
      injectInSubqueryFilter(
        filterApplicationSideExp,
        filterApplicationSidePlan,
        filterCreationSideExp,
        filterCreationSidePlan
      )
    
  

默认情况下是会插入InSubquery节点的,这里有个很重要的判断:

  private def injectInSubqueryFilter(
      filterApplicationSideExp: Expression,
      filterApplicationSidePlan: LogicalPlan,
      filterCreationSideExp: Expression,
      filterCreationSidePlan: LogicalPlan): LogicalPlan = 
    require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType)
    val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
    val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
    val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)
    if (!canBroadcastBySize(aggregate, conf)) 
      // Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold,
      // i.e., the semi-join will be a shuffled join, which is not worthwhile.
      return filterApplicationSidePlan
    
    val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),
      ListQuery(aggregate, childOutputs = aggregate.output))
    Filter(filter, filterApplicationSidePlan)
  

if (!canBroadcastBySize(aggregate, conf)),如果filter的create的一方不能够进行在运行时进行broadcast的转换,那么就跳过。原因是如果不能够进行broadcast join的转换的话,那么就会是shuffle join,这种情况下会得不偿失。对于InSubquery转换为SemiJoin是在RewritePredicateSubquery 规则中的(后续SemiJoin会转BroadcastJoin)。
对于开启了BloomFilter的情况下,就会运行injectBloomFilter代码:

  private def injectBloomFilter(
      filterApplicationSideExp: Expression,
      filterApplicationSidePlan: LogicalPlan,
      filterCreationSideExp: Expression,
      filterCreationSidePlan: LogicalPlan): LogicalPlan = 
    // Skip if the filter creation side is too big
    if (filterCreationSidePlan.stats.sizeInBytes > conf.runtimeFilterCreationSideThreshold) 
      return filterApplicationSidePlan
    
    val rowCount = filterCreationSidePlan.stats.rowCount
    val bloomFilterAgg =
      if (rowCount.isDefined && rowCount.get.longValue > 0L) 
        new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)),
          Literal(rowCount.get.longValue))
       else 
        new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)))
      
    val aggExp = AggregateExpression(bloomFilterAgg, Complete, isDistinct = false, None)
    val alias = Alias(aggExp, "bloomFilter")()
    val aggregate =
      ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), filterCreationSidePlan)))
    val bloomFilterSubquery = ScalarSubquery(aggregate, Nil)
    val filter = BloomFilterMightContain(bloomFilterSubquery,
      new XxHash64(Seq(filterApplicationSideExp)))
    Filter(filter, filterApplicationSidePlan)
  

这里有个spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold的判断(默认是10M),如果大于该值,就直接跳过,因为如果Filter 创建的一方过大的话,Bloom Filter的误报率会上升,达到的效果就没有那么好了。
这里会构建BloomFilterAggregateBloomFilterMightContain以及ScalarSubqueryScalarSubquery最终会通过PlanSubqueries转化为execution.ScalarSubquery(SubqueryExec)形式,从而在driver端把数据收集过来,继而在BloomFilterMightContain进行计算

与动态分区裁剪的区别

  • 动态分区裁剪是被裁剪的join一边必须是分区的,而且join另一边在exchange之前存在条件过滤,而且默认是存在broadcastJoin的时候,才会进行分区裁剪;Runtime Filter 没有限制,但是Runtime Filter的适用条件更加严格
  • 动态分区裁员能够减少source scan的IO,而Runtime Filter不行,因为动态分区是基于分区进行过滤的。
  • Runtime Filter是可以基于非分区的字段作为join key,而动态分区裁剪必须是基于分区字段的join key

以上是关于SPARK最新特性Runtime Filtering(运行时过滤)以及与动态分区裁剪的区别的主要内容,如果未能解决你的问题,请参考以下文章

Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend

结合Spark讲一下Flink的runtime

Spark学习摘记 —— Spark转化操作API归纳

[北京] Intel 招聘 Cloud Native Runtime Development Engineer

华为云技术分享快速理解spark-on-k8s中的external-shuffle-service

Pytorch基础教程33spark或dl模型部署(MLFlow/ONNX/Runtime/tensorflow serving)