广播哈希连接 - 迭代

Posted

技术标签:

【中文标题】广播哈希连接 - 迭代【英文标题】:Broadcast hash join - Iterative 【发布时间】:2018-12-14 17:21:19 【问题描述】:

当我们有一个小到足以放入内存的数据帧时,我们会在 Spark 中使用广播哈希连接。当小数据框的大小低于spark.sql.autoBroadcastJoinThreshold 我对此有几个问题。

我们提示为广播的小数据帧的生命周期是什么?它将在记忆中保留多久?我们如何控制它?

例如,如果我使用广播哈希连接将一个大数据帧和一个小数据帧连接了两次。当第一次连接执行时,它会将小数据帧广播到工作节点并执行连接,同时避免大数据帧数据的混洗。

我的问题是,执行者会保留广播数据帧的副本多长时间?它会保留在内存中直到会话结束吗?或者一旦我们采取任何行动,它就会被清除。我们可以控制或清除它吗?或者我只是想错了方向......

【问题讨论】:

【参考方案1】:

至少在 Spark 2.4.0 中,您的问题的答案是数据帧将保留在驱动程序进程的内存中,直到 SparkContext 完成,即,直到您的应用程序结束。

广播连接实际上是使用广播变量实现的,但是在使用 DataFrame API 时,您无法访问底层广播变量。 Spark 本身在内部使用这个变量后并不会销毁它,所以它只是一直存在。

具体来说,如果您查看 BroadcastExchangeExec (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala) 的代码,您可以看到它创建了一个私有变量 relationFuture,其中保存了 Broadcast 变量。这个私有变量只在这个类中使用。作为用户,您无法访问它并对其调用 destroy ,并且在当前实现中,Spark 没有任何地方为您调用它。

【讨论】:

..谢谢..这个现在有点意义了。 这个 Spark 错误报告很好地讨论了这个问题。 issues.apache.org/jira/browse/… 非常感谢。您对迭代广播哈希连接有任何想法吗?我试图编写代码.. 没有成功。 我不知道,尽管我可能会在下个月的某个时候再次讨论这个问题并重新开始工作。对于我的用例,我们现在解决了这个问题。 谢谢。如果您有任何见解,请告诉我。我会很感激你的。 ?【参考方案2】:

这里的想法是在加入之前创建广播变量以轻松控制它。没有它,您将无法控制这些变量 - spark 为您完成。

例子:

from pyspark.sql.functions import broadcast

sdf2_bd = broadcast(sdf2)
sdf1.join(sdf2_bd, sdf1.id == sdf2_bd.id)

适用于所有广播变量(在连接中自动创建或手动创建):

    广播数据仅发送到包含需要它的执行程序的节点。 广播数据存储在内存中。如果没有足够的可用内存,则使用磁盘。 使用完广播变量后,您应该destroy 释放内存。

【讨论】:

vikrant 正在谈论连接而不是广播变量的广播提示 @ArnonRotem-Gal-Oz 广播加入利用下面的广播变量,因此适用相同的规则 是,除了第 3 步 - 您没有要销毁的变量句柄,因此您无法控制其生命周期 @Arnon。我同意你的看法 @luminousmen 你错了——在 sdf2_bd 上调用 unpersist() (在你的例子中)没有效果,因为它在数据帧意义上没有持久化(你可以让它持久化——你可以调用 sdf2_bd .cache() 两次,你会看到第二次会给你一个警告,它已经被缓存了(但不是第一次)。在数据帧上调用 unpersist 会释放这个。当你用广播标记数据帧时,它只会标记它带有一个提示,当 spark 建立连接时会解决。但你无法控制生命周期【参考方案3】:

在我对广播选项进行了一些研究后,这里有一些额外的发现。

让我们考虑下一个例子:

import org.apache.spark.sql.functions.lit, broadcast

val data = Seq(
(2010, 5, 10, 1520, 1),
(2010, 5, 1, 1520, 1),
(2011, 11, 25, 1200, 2),
(2011, 11, 25, 1200, 1),
(2012, 6, 10, 500, 2),
(2011, 11, 5, 1200, 1),
(2012, 6, 1, 500, 2),
(2011, 11, 2, 200, 2))

val bigDF = data
            .toDF("Year", "Month", "Day", "SalesAmount", "StoreNumber")
            .select("Year", "Month", "Day", "SalesAmount")

val smallDF = data
            .toDF("Year", "Month", "Day", "SalesAmount", "StoreNumber")
            .where($"Year" === lit(2011))
            .select("Year", "Month", "Day", "StoreNumber")

val partitionKey = Seq("Year", "Month", "Day")
val broadcastedDF = broadcast(smallDF)
val joinedDF = bigDF.join(broadcastedDF, partitionKey)

正如所料,joinedDF 的执行计划应该是下一个:

== Physical Plan ==
*(1) Project [Year#107, Month#108, Day#109, SalesAmount#110, StoreNumber#136]
+- *(1) BroadcastHashJoin [Year#107, Month#108, Day#109], [Year#132, Month#133, Day#134], Inner, BuildRight, false
   :- LocalTableScan [Year#107, Month#108, Day#109, SalesAmount#110]
   +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, int, false], input[1, int, false], input[2, int, false]))
      +- LocalTableScan [Year#132, Month#133, Day#134, StoreNumber#136]

如果没有显式广播,这可能也是一样的,因为 smallDF 非常小,它适合默认的广播大小 (10MB)。

现在,我希望我能够从joinedDF的依赖项中访问广播的数据帧,因此我尝试通过打印出joinedDF和broadcastedDF的所有依赖项的rdd.id来访问广播df。功能:

import org.apache.spark.rdd._

def printDependency(rdd : RDD[_], indentation: String = "") : Unit = 
      if (rdd == null)
        return;

      println(s"$indentation Partition Id: $rdd.id ")
      rdd.dependencies.foreach  d => printDependency(d.rdd, s"$indentation ")


println(s"Broadcasted id: $broadcastedDF.rdd.id")
printDependency(joinedDF.rdd)

//Output
//
// Broadcasted id: 164
//
// Partition Id: 169 
//   Partition Id: 168 
//    Partition Id: 167 
//     Partition Id: 166 
//      Partition Id: 165

令人惊讶的是,我意识到广播的数据帧不包含/不被认为是joinedDF的DAG的一部分,这是有道理的,因为一旦我们广播了smallDF的实例,我们就不想再跟踪它的变化了,当然Spark 意识到了这一点。

释放广播数据集的一种方法是使用unpersist,如下所示:

val broadcastedDF = smallDF.hint("broadcast")
val joinedDF = bigDF.join(broadcastedDF, partitionKey)

broadcastedDF.unpersist()

第二种方法是直接使用 sparkContext API,如下所示:

val broadcastedDF = spark.sparkContext.broadcast(smallDF)
val joinedDF = bigDF.join(broadcastedDF.value, partitionKey)
broadcastedDF.destroy() // or unpersist for async

虽然这会删除广播实例本身,而不是底层的 smallDF。最后一个将被标记为删除,并且不会立即删除,具体取决于它是否有其他引用。这将与ContextCleaner 类结合使用,更具体地说,将由keepCleaning 方法控制,该方法试图在程序执行期间或上下文结束时异步删除不再需要的 RDD、累加器、随机播放和检查点(如前所述)。

第二种方法(在我看来更安全)删除不再使用的joinedDF依赖项是通过方法df.persist()、df.checkpoint()、rdd.persist()和rdd.checkpoint()。所有提到的方法最终都会调用ContextCleaner 类的 registerRDDForCleanup 或 registerForCleanup 方法,以清理它们的父依赖项。

出现的一个明显问题是使用哪一个以及有什么区别?有两个主要区别,首先使用checkpoint(),您可以通过从同一检查点目录加载数据,在第二个作业中重用输出数据。其次,dataframe API 会在保存数据时应用额外的优化,RDD API 中没有这样的功能。

所以最后的结论是,您可以通过调用 df.persist(), df.checkpoint, rdd.persist() and rdd.checkpoint() 之一来修剪 RDD 祖先的数据。 修剪将在作业执行期间进行,而不仅仅是在终止上下文时。最后但并非最不重要的一点是,您不应忘记所有先前的方法将被延迟评估,因此仅在之后发生执行一个动作。

更新:

似乎立即为数据帧/RDD 强制释放内存的最有效方法是调用unpersist,正如here 所讨论的那样。然后代码将稍微更改为:

val broadcastedDF = smallDF.hint("broadcast")
val joinedDF = bigDF.join(broadcastedDF, partitionKey)
broadcastedDF.unpersist()

【讨论】:

下面是参考链接..听起来很有趣....databricks.com/session/… 我检查了提到的 Vikrant 视频,尽管在我看来 ***.com/questions/53524062/efficient-pyspark-join/… 您已经实现了类似的场景,不同之处在于您必须根据 par_id 从 EMP_DF2 广播数据,然后将其加入内连接_EMP。此外,迭代广播适用于数据倾斜场景,我相信它不会在所有情况下都有效,因为您必须使用磁盘存储并替换默认连接行为

以上是关于广播哈希连接 - 迭代的主要内容,如果未能解决你的问题,请参考以下文章

为啥不将哈希函数迭代 10,000,000 次? [复制]

迭代液体模板中的哈希

迭代时无意中将键添加到哈希

使用数组中的键迭代哈希,并对结果求和

是否有解决方案绕过“无法在迭代期间将新密钥添加到哈希(RuntimeError)”?

RuntimeError:在 Rack 中的迭代期间无法将新密钥添加到哈希中