SPARK最新特性Runtime Filtering(运行时过滤)以及与动态分区裁剪的区别
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK最新特性Runtime Filtering(运行时过滤)以及与动态分区裁剪的区别相关的知识,希望对你有一定的参考价值。
背景
本文基于 SPARK 3.3.0
在最新发布的SPARK RELEASE,第一个显著的特性就是row-level Runtime Filtering,我们来分析一下
分析
直接转到对应的 Jira SPARK-32268,里面涉及到的TPC benchmark,在数据行数比较大的情况下,BloomFilter带来的性能提升还是很明显的,最重要的设计文档在Row-level Runtime Filters in Spark,
里面讲了两个关键点:
- Runtime Filter涉及两种模式,一种是In Filter(In Filter会转换为Semi Join),一种是Bloom Filter
- Bloom Filter的参数控制 n_items(多少行数据),n_bits(多个位来标识)
代码实现
该功能涉及到两个主要的Rule InjectRuntimeFilter和 RewritePredicateSubquery
Batch("InjectRuntimeFilter", FixedPoint(1),
InjectRuntimeFilter,
RewritePredicateSubquery) :+
第一个规则InjectRuntimeFilter,会根据spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled
和spark.sql.optimizer.runtime.bloomFilter.enabled
配置项来开启是否进行RunTime Filter的转换(默认情况下是关闭的)。
private def tryInjectRuntimeFilter(plan: LogicalPlan): LogicalPlan =
var filterCounter = 0
val numFilterThreshold = conf.getConf(SQLConf.RUNTIME_FILTER_NUMBER_THRESHOLD)
plan transformUp
case join @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, left, right, hint) =>
var newLeft = left
var newRight = right
(leftKeys, rightKeys).zipped.foreach((l, r) =>
// Check if:
// 1. There is already a DPP filter on the key
// 2. There is already a runtime filter (Bloom filter or IN subquery) on the key
// 3. The keys are simple cheap expressions
if (filterCounter < numFilterThreshold &&
!hasDynamicPruningSubquery(left, right, l, r) &&
!hasRuntimeFilter(newLeft, newRight, l, r) &&
isSimpleExpression(l) && isSimpleExpression(r))
val oldLeft = newLeft
val oldRight = newRight
if (canPruneLeft(joinType) && filteringHasBenefit(left, right, l, hint))
newLeft = injectFilter(l, newLeft, r, right)
// Did we actually inject on the left? If not, try on the right
if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
filteringHasBenefit(right, left, r, hint))
newRight = injectFilter(r, newRight, l, left)
if (!newLeft.fastEquals(oldLeft) || !newRight.fastEquals(oldRight))
filterCounter = filterCounter + 1
)
join.withNewChildren(Seq(newLeft, newRight))
适用RunTime Filter得有一下几个前提:
- 没超过最大插入Runtime Filter的阈值,
spark.sql.optimizer.runtimeFilter.number.threshold
(
默认是10) - 逻辑计划没有插入动态分区,没有插入Runtime Filter以及该key仅仅是简单的join条件(先做简单的)
- 应用于Filter的join的一方能够通过join和aggerate 下推
- Filter创建的join一方有一个可选择行的操作(用来过滤应用于Filter一方的数据)
- 应用于Filter的join的一方的scan的文件大小必须大于某个阈值
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold
默认是10GB(这样才能达到好的过滤效果) - 要么当前的是一个shuffle Join或者是一个存在shuffle的broadcast join只有在这些情况下才能够插入Filter进行过滤)
注意:最后一个条件,单从代码上是看不出来为什么是broadcast join的,难道hash join不行么?.
下面的分析会解释:
private def injectFilter(
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan =
require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled)
if (conf.runtimeFilterBloomFilterEnabled)
injectBloomFilter(
filterApplicationSideExp,
filterApplicationSidePlan,
filterCreationSideExp,
filterCreationSidePlan
)
else
injectInSubqueryFilter(
filterApplicationSideExp,
filterApplicationSidePlan,
filterCreationSideExp,
filterCreationSidePlan
)
默认情况下是会插入InSubquery节点的,这里有个很重要的判断:
private def injectInSubqueryFilter(
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan =
require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType)
val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)
if (!canBroadcastBySize(aggregate, conf))
// Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold,
// i.e., the semi-join will be a shuffled join, which is not worthwhile.
return filterApplicationSidePlan
val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),
ListQuery(aggregate, childOutputs = aggregate.output))
Filter(filter, filterApplicationSidePlan)
if (!canBroadcastBySize(aggregate, conf))
,如果filter的create的一方不能够进行在运行时进行broadcast的转换,那么就跳过。原因是如果不能够进行broadcast join的转换的话,那么就会是shuffle join,这种情况下会得不偿失。对于InSubquery转换为SemiJoin
是在RewritePredicateSubquery
规则中的(后续SemiJoin会转BroadcastJoin)。
对于开启了BloomFilter的情况下,就会运行injectBloomFilter
代码:
private def injectBloomFilter(
filterApplicationSideExp: Expression,
filterApplicationSidePlan: LogicalPlan,
filterCreationSideExp: Expression,
filterCreationSidePlan: LogicalPlan): LogicalPlan =
// Skip if the filter creation side is too big
if (filterCreationSidePlan.stats.sizeInBytes > conf.runtimeFilterCreationSideThreshold)
return filterApplicationSidePlan
val rowCount = filterCreationSidePlan.stats.rowCount
val bloomFilterAgg =
if (rowCount.isDefined && rowCount.get.longValue > 0L)
new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)),
Literal(rowCount.get.longValue))
else
new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)))
val aggExp = AggregateExpression(bloomFilterAgg, Complete, isDistinct = false, None)
val alias = Alias(aggExp, "bloomFilter")()
val aggregate =
ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), filterCreationSidePlan)))
val bloomFilterSubquery = ScalarSubquery(aggregate, Nil)
val filter = BloomFilterMightContain(bloomFilterSubquery,
new XxHash64(Seq(filterApplicationSideExp)))
Filter(filter, filterApplicationSidePlan)
这里有个spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold
的判断(默认是10M),如果大于该值,就直接跳过,因为如果Filter 创建的一方过大的话,Bloom Filter的误报率会上升,达到的效果就没有那么好了。
这里会构建BloomFilterAggregate
和BloomFilterMightContain
以及ScalarSubquery
,ScalarSubquery
最终会通过PlanSubqueries
转化为execution.ScalarSubquery(SubqueryExec)
形式,从而在driver端把数据收集过来,继而在BloomFilterMightContain
进行计算
与动态分区裁剪的区别
- 动态分区裁剪是被裁剪的join一边必须是分区的,而且join另一边在exchange之前存在条件过滤,而且默认是存在broadcastJoin的时候,才会进行分区裁剪;Runtime Filter 没有限制,但是Runtime Filter的适用条件更加严格
- 动态分区裁员能够减少source scan的IO,而Runtime Filter不行,因为动态分区是基于分区进行过滤的。
- Runtime Filter是可以基于非分区的字段作为join key,而动态分区裁剪必须是基于分区字段的join key
以上是关于SPARK最新特性Runtime Filtering(运行时过滤)以及与动态分区裁剪的区别的主要内容,如果未能解决你的问题,请参考以下文章
Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
[北京] Intel 招聘 Cloud Native Runtime Development Engineer
华为云技术分享快速理解spark-on-k8s中的external-shuffle-service
Pytorch基础教程33spark或dl模型部署(MLFlow/ONNX/Runtime/tensorflow serving)