如何在加入(广播)和使用 Spark 收集之间进行选择

Posted

技术标签:

【中文标题】如何在加入(广播)和使用 Spark 收集之间进行选择【英文标题】:How to choose between join(broadcast) and collect with Spark 【发布时间】:2018-02-02 13:21:01 【问题描述】:

我使用的是 Spark 2.2.1。

我有一个小的 DataFrame(小于 1M),我有一个大 DataFrame 上的计算,需要这个小 DataFrame 来计算 UDF 中的一列。

关于性能的最佳选择是什么

广播这个DF是不是更好(我不知道Spark是否会将笛卡尔做内存)。

bigDF.crossJoin(broadcast(smallDF))
     .withColumn(udf("$colFromSmall", $"colFromBig"))

或收集它并直接在udf中使用small

val small = smallDF.collect()
bigDF.withColumn(udf($"colFromBig"))

【问题讨论】:

【参考方案1】:

两者都会先collect 数据,所以在内存占用方面没有区别。所以选择应该由逻辑决定:

如果您可以比默认执行计划做得更好并且不想创建自己的执行计划,udf 可能是更好的方法。 如果它只是一个笛卡尔坐标,并且需要后续的explode - 虽然会消失 - 只需使用前一个选项。

正如in the comments T.Gawęda 在第二种情况下建议的那样,您可以使用广播

val small = spark.spark.broadcast(smallDF.collect())
bigDF.withColumn(udf($"colFromBig"))

如果重用udf,它可能会提供一些性能改进。

【讨论】:

交叉连接将为第一个 DF 中的每一行从 smallDF 制作一个或多个副本。内存占用会更高,在我切换到第二种方法之前,我得到了很多 OOM。当然,只有在 UDF 中需要时,手动广播才会更好 顺便说一句。请建议手动广播而不是直接参考:) @T.Gawęda 我会专注于明确的论点,我不确定广播在这里是否有益,但完成了:)

以上是关于如何在加入(广播)和使用 Spark 收集之间进行选择的主要内容,如果未能解决你的问题,请参考以下文章

您可以使用 SparkR 进行广播加入吗?

Spark中自定义累加器和广播变量的使用

Spark 广播连接将数据加载到驱动程序

Spark如何仅在分区内加入

sparksql缓存表能做广播变量吗

如何在不收集的情况下将 RDD、Dataframe 或 Dataset 直接转换为广播变量?