Scala Spark isin 广播列表
Posted
技术标签:
【中文标题】Scala Spark isin 广播列表【英文标题】:Scala Spark isin broadcast list 【发布时间】:2020-04-08 22:39:29 【问题描述】:我正在尝试尽可能优化地执行 isin 过滤器。有没有办法使用 Scala API 广播 collList?
编辑:我不是在寻找替代方案,我知道它们,但我需要 isin 以便我的 RelationProviders 会下推这些值。
val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
//collList.size == 200.000
val retTable = df.filter(col("col1").isin(collList: _*))
我传递给“isin”方法的列表有多达 200.000 个独特的元素。
我知道这看起来不是最好的选择,并且加入听起来更好,但我需要将这些元素推入过滤器,在阅读 (我的存储是 Kudu,但它也适用于 HDFS+Parquet,基础数据太大,查询只能处理大约 1% 的数据),我已经测量了所有内容,它为我节省了大约 30 分钟的执行时间:)。另外,如果 isin 大于 200.000,我的方法已经很小心了。
我的问题是,我收到了一些 Spark“任务太大”(每个任务约 8mb)警告,一切正常,所以没什么大不了的,但我希望删除它们并进行优化。
我已经尝试过,它什么也没做,因为我仍然收到警告(因为广播的 var 在 Scala 中得到解析并传递给我猜的 vargargs):
val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
val retTable = df.filter(col("col1").isin(sc.broadcast(collList).value: _*))
还有这个不能编译:
val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
val retTable = df.filter(col("col1").isin(sc.broadcast(collList: _*).value))
而这个不起作用(任务太大仍然出现)
val broadcastedList=df.sparkSession.sparkContext.broadcast(collList.map(lit(_).expr))
val filterBroadcasted=In(col("col1").expr, collList.value)
val retTable = df.filter(new Column(filterBroadcasted))
关于如何广播这个变量的任何想法? (允许黑客攻击)。任何允许过滤器下推的 isin 替代方案也是有效的我见过有人在 PySpark 上这样做,但 API 不一样。
PS:不可能对存储进行更改,我知道分区(已经分区,但不是按该字段)等可能会有所帮助,但用户输入完全是随机的,并且数据被访问并更改了我的许多客户端。
【问题讨论】:
【参考方案1】:在这种情况下,我会选择数据帧广播哈希连接,而不是广播变量。
准备一个带有collectedDf("col1")
集合列表的数据框,您要使用isin
进行过滤,然后
使用 2 个数据帧之间的连接来过滤匹配的行。
我认为它比isin
更有效,因为您有 200k 条目要过滤。 spark.sql.autobroadcastjointhreshhold
是您需要设置适当大小的属性(默认为 10mb)。 AFAIK 根据您的要求,您可以使用到 200mb 或 3oomb。
see this BHJ Explanation of how it works
延伸阅读Spark efficiently filtering entries from big dataframe that exist in a small dataframe
【讨论】:
嗨 Ram,正如我在问题中所说,虽然这个答案对常见情况有效,但当输入数据大于 200k 个元素时我会这样做(实际上我的输入数据是一个数据框) ,但是对于这个具体案例,进行左连接(从而读取整个表)需要 30 分钟,200k 个元素(通常更少,大约 50k) isin 需要大约 2 分钟来“广播” isin 和大约 3-5 分钟来阅读。我无法读取所需的所有列,并且使用另一个下推过滤器无法充分减少数据 这里的重点是谓词下推,我需要将谓词推送到服务器(Kudu),所以我不需要检索比实际数据多 10-20 倍的数据寻找。如果有办法强制leftsemi joins(这是我应该在这里使用的)下推它也会起作用。 我对 kudu 及其工作原理不太了解。所以我无法理解你的 PP 部分,但一般来说,如果你使用 parquet 并且如果你使用 where 条件,它将使用谓词下推。也许你想喜欢我不确定。 是的,问题是为了使用“where 条件”,Spark 必须发送大任务(其中包含大 where 列表的副本,最多 200.000 个元素),看起来 Spark 是并非旨在允许在这些列表上进行广播(我也很惊讶,我认为限制将是 ~1000 个元素,但它可以正常工作到 400k [没有测试更多,因为存在大约 400.000 个元素我没有得到性能改进对于我的用例]【参考方案2】:我会带着大任务离开,因为我在我的程序中只使用了两次(但节省了很多时间),而且我负担得起,但如果其他人急需它......好吧,这似乎是路径。
我发现有大数组下推的最佳替代方案:
-
更改您的关系提供程序,以便在按下过滤器时广播大列表 在过滤器中,这可能会留下一些广播垃圾,但是...,只要您的应用程序没有流式传输,它不应该是有问题,或者您可以保存在全局列表中并在一段时间后清除它们
在 Spark 中添加一个过滤器(我在 https://issues.apache.org/jira/browse/SPARK-31417 写了一些东西),它允许将广播下推一直到您的关系提供者。您必须添加自定义谓词,然后实现自定义“下推”(您可以通过添加新规则来完成此操作),然后重写您的 RDD/Relation 提供程序,以便它可以利用变量被广播的事实。
阅读后使用 coalesce(X) 减少任务数量,有时可以工作,具体取决于 RelationProvider/RDD 的实现方式。
【讨论】:
以上是关于Scala Spark isin 广播列表的主要内容,如果未能解决你的问题,请参考以下文章