Spark 广播连接将数据加载到驱动程序

Posted

技术标签:

【中文标题】Spark 广播连接将数据加载到驱动程序【英文标题】:Spark broadcast join loads data to driver 【发布时间】:2016-10-28 10:47:10 【问题描述】:

据我所知,Spark 执行广播连接时,它首先将最小(广播)RDD 收集到驱动程序以从中生成广播变量,然后才将其上传到每个目标节点。

广播 RDD > spark.driver.memory 有时会导致驱动内存溢出。

问题:为什么它会以这种方式工作?只在目标节点之间打乱广播数据效率更高,因为打乱的数据量是相同的,但我们可以避免驱动程序溢出。

示例:假设您有 3 个节点和 1 GB 数据要在每个节点上广播,每个节点的吞吐量为 1 GB/秒。

Spark 方法 - 每个节点必须将其数据块 (1gb) 上传到驱动程序并下载广播变量 (3 * 1g = 3gb),因此每个节点总共应传输 4gb,需要 4s。

Shuffle 方法 - 一个节点必须将 1gb 上传到其他 2 个节点并下载 这些 1gb。同样,总量为 4 GB,需要 4 秒。

【问题讨论】:

你怎么能确定内存溢出来自那个? @eliasah 因为我多次看到驱动程序内存不足溢出,堆栈跟踪显然表明原因是从广播 RDD 创建广播变量。唯一有帮助的是增加 spark.driver.memory OOME 的原因有很多,除非我们真的看起来像“一些代码”,否则我们无法缩小范围 @eliasah 很容易重现:将 spark.driver.memory 设置为 1g,然后尝试:df1.join(broadcast(df2)) 其中 df1 为 100 gb,df2 为 2 gb。您将在驱动程序 100% 上获得 OOM 你不需要广播一个大的DataFrame。你读过这个***.com/a/35257145/3415409 吗? 【参考方案1】:

首先广播连接用于连接一个大表和一个极小的表。

那么如果使用shuffle而不是将small df(table)收集回driver然后广播,你只注意到small df被shuffle,但实际上big df也同时被shuffle,这是相当的时间消耗。

【讨论】:

【参考方案2】:

这是不正确的。 Spark 不使用广播进行 RDD 连接。

Spark 可以将广播用于DataFrame 连接,但它不应用于处理大型对象。最好使用标准的 HashJoin。

【讨论】:

确保 Spark 不使用广播进行 RDD 连接。我的意思是,当 Spark 在内部执行广播连接(对于 DataFrames)时,它会将广播 DataFrame 的 RDD[Row] 移动到 master。 -1 有效吗?

以上是关于Spark 广播连接将数据加载到驱动程序的主要内容,如果未能解决你的问题,请参考以下文章

Spark 广播替代方案

sparksql缓存表能做广播变量吗

广播哈希连接 - 迭代

如何从驱动程序将不适合驱动程序内存的数据加载到 Spark 独立集群中?

Spark的Join连接

带有广播连接的 Spark 流式传输