Spark 数据集连接性能
Posted
技术标签:
【中文标题】Spark 数据集连接性能【英文标题】:Spark Dataset join performance 【发布时间】:2019-07-12 01:37:12 【问题描述】:我收到了一个数据集,我需要join
它与另一个表。因此,我想到的最简单的解决方案是为另一个表创建第二个数据集并执行joinWith
。
def joinFunction(dogs: Dataset[Dog]): Dataset[(Dog, Cat)] =
val cats: Dataset[Cat] = spark.table("dev_db.cat").as[Cat]
dogs.joinWith(cats, ...)
我主要关心的是 spark.table("dev_db.cat")
,因为感觉就像我们将所有 cat
数据称为
SELECT * FROM dev_db.cat
然后在稍后阶段执行join
。或者查询优化器会直接执行连接而不引用整个表吗?有没有更好的解决方案?
【问题讨论】:
【参考方案1】:以下是针对您的案例的一些建议:
a. 如果您有where
、filter
、limit
、take
等操作,请尝试在加入两个数据集之前应用它们。 Spark 无法下推这类过滤器,因此您必须自己尽可能减少目标记录的数量。 Here Spark 优化器的绝佳信息来源。
b. 尝试使用repartition
函数来共同定位数据集并最小化混洗数据。重新分区应基于参与join
的密钥,即:
dogs.repartition(1024, "key_col1", "key_col2")
dogs.join(cats, Seq("key_col1", "key_col2"), "inner")
c. 如果您确定较小的数据集可以放入内存,请尝试使用broadcast
(或增加spark.broadcast.blockSize
的值)。这对 Spark 程序的性能有一定的提升,因为它将确保两个数据集在同一节点内共存。
如果您无法应用上述任何一项,则 Spark 无法知道应排除哪些记录,因此将扫描两个数据集中的所有可用行。
【讨论】:
通过包含 join 所涉及的键的重新分区,在大多数情况下可以保证键的协同定位 我认为 repartition/join 操作涉及很多部分,因此我认为仅指向代码的一部分并不容易。尽管重新分区是一个众所周知的操作,它正是这样做的,例如df.repartition(8, Seq("c1", "c2"))
会将 Hash(c1,c2) 的组合重新分配到 8 个分区中。然后对于连接,Spark 将尝试共同定位第二个数据帧的键 (c1, c2)。这就是为什么会发生改组的原因,因为 Spark 会尝试将所有数据放在正确的位置。
忘了我提到它。我在考虑初始负载。我正在报废 cmets 作为 repart 和 join 当然是这样。但是问题的主旨有所不同。【参考方案2】:
你需要做一个解释,看看是否使用了谓词下推。然后您就可以判断您的关注是否正确。
但是,现在一般来说,如果没有使用复杂数据类型和/或数据类型不匹配不明显,则会发生下推。您也可以通过简单的 createOrReplaceTempView 看到这一点。见https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/4201913720573284/4413065072037724/latest.html
【讨论】:
以上是关于Spark 数据集连接性能的主要内容,如果未能解决你的问题,请参考以下文章
Java Spark:使用未知连接列名称连接的数据集的 Spark 错误解决方法
Spark 数据集 Joinwith 错误:连接条件丢失或不重要