Spark 数据集连接性能

Posted

技术标签:

【中文标题】Spark 数据集连接性能【英文标题】:Spark Dataset join performance 【发布时间】:2019-07-12 01:37:12 【问题描述】:

我收到了一个数据集,我需要join 它与另一个表。因此,我想到的最简单的解决方案是为另一个表创建第二个数据集并执行joinWith

    def joinFunction(dogs: Dataset[Dog]): Dataset[(Dog, Cat)] = 
      val cats: Dataset[Cat] = spark.table("dev_db.cat").as[Cat]
      dogs.joinWith(cats, ...)
    

我主要关心的是 spark.table("dev_db.cat"),因为感觉就像我们将所有 cat 数据称为

    SELECT * FROM dev_db.cat

然后在稍后阶段执行join。或者查询优化器会直接执行连接而不引用整个表吗?有没有更好的解决方案?

【问题讨论】:

【参考方案1】:

以下是针对您的案例的一些建议:

a. 如果您有wherefilterlimittake 等操作,请尝试在加入两个数据集之前应用它们。 Spark 无法下推这类过滤器,因此您必须自己尽可能减少目标记录的数量。 Here Spark 优化器的绝佳信息来源。

b. 尝试使用repartition 函数来共同定位数据集并最小化混洗数据。重新分区应基于参与join 的密钥,即:

dogs.repartition(1024, "key_col1", "key_col2")
dogs.join(cats, Seq("key_col1", "key_col2"), "inner")

c. 如果您确定较小的数据集可以放入内存,请尝试使用broadcast(或增加spark.broadcast.blockSize 的值)。这对 Spark 程序的性能有一定的提升,因为它将确保两个数据集在同一节点内共存。

如果您无法应用上述任何一项,则 Spark 无法知道应排除哪些记录,因此将扫描两个数据集中的所有可用行。

【讨论】:

通过包含 join 所涉及的键的重新分区,在大多数情况下可以保证键的协同定位 我认为 repartition/join 操作涉及很多部分,因此我认为仅指向代码的一部分并不容易。尽管重新分区是一个众所周知的操作,它正是这样做的,例如df.repartition(8, Seq("c1", "c2")) 会将 Hash(c1,c2) 的组合重新分配到 8 个分区中。然后对于连接,Spark 将尝试共同定位第二个数据帧的键 (c1, c2)。这就是为什么会发生改组的原因,因为 Spark 会尝试将所有数据放在正确的位置。 忘了我提到它。我在考虑初始负载。我正在报废 cmets 作为 repart 和 join 当然是这样。但是问题的主旨有所不同。【参考方案2】:

你需要做一个解释,看看是否使用了谓词下推。然后您就可以判断您的关注是否正确。

但是,现在一般来说,如果没有使用复杂数据类型和/或数据类型不匹配不明显,则会发生下推。您也可以通过简单的 createOrReplaceTempView 看到这一点。见https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/4201913720573284/4413065072037724/latest.html

【讨论】:

以上是关于Spark 数据集连接性能的主要内容,如果未能解决你的问题,请参考以下文章

Java Spark:使用未知连接列名称连接的数据集的 Spark 错误解决方法

Spark 数据集连接和聚合列

Spark 数据集 Joinwith 错误:连接条件丢失或不重要

Spark 连接数据框和数据集

使用 Scala 在 Apache Spark 中连接不同 RDD 的数据集

使用空数据集的Spark SQL连接会导致更大的输出文件大小