超过 `spark.driver.maxResultSize` 而没有给驱动程序带来任何数据

Posted

技术标签:

【中文标题】超过 `spark.driver.maxResultSize` 而没有给驱动程序带来任何数据【英文标题】:Exceeding `spark.driver.maxResultSize` without bringing any data to the driver 【发布时间】:2017-03-13 22:49:10 【问题描述】:

我有一个执行大型连接的 Spark 应用程序

val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")

然后将生成的 DataFrame 聚合为一个可能有 13k 行的数据帧。在加入过程中,作业失败并显示以下错误消息:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)

这发生在之前没有设置spark.driver.maxResultSize,所以我设置了spark.driver.maxResultSize=2G。然后,我对连接条件稍作更改,错误再次出现。

编辑:在调整集群大小时,我还将 .coalesce(256) 中的 DataFrame 假定的分区数量翻了一番,达到了 .coalesce(512),所以我不能确定不是因为这个.

我的问题是,既然我没有向司机收集任何东西,为什么spark.driver.maxResultSize 在这里很重要?驱动程序的内存是否用于我不知道的连接中的某些内容?

【问题讨论】:

遇到同样的问题,请问您有什么进展吗? @user4601931 你能粘贴你正在运行的实际 scala 代码吗? val joined = uniqueDates.join(df, $"start_date" &lt;= $"date" &amp;&amp; $"date" &lt;= $"end_date") 行不会运行任何作业。您必须进行一些触发工作的转换。 您能检查一下joined 中有多少个分区吗?像joined.queryExecution.toRdd.getNumPartitions 这样的东西。我很好奇你为什么有78021 tasks。是否更好的解决方案是减少连接中数据集的分区数量? @JacekLaskowski 不幸的是,我已经没有这个项目的代码了,而且时间太长了,我已经忘记了它的大部分内容。抱歉,但感谢您对这个问题再次感兴趣。 @JacekLaskowski 我无法在此处显示查询计划,但它崩溃的阶段包含 +3000 个任务。很多 FileScanRDD 后跟 MapPartitionsRDD。然后很多UnionRDD。最后对所有联合的结果进行不同的操作。但是没有(广播)加入或收集......我当然可以看到为什么这个执行计划不理想,但不是spark.driver.maxResultSize进来的地方。当--deploy-mode cluster设置时没有崩溃。 【参考方案1】:

仅仅因为您没有明确收集任何东西并不意味着没有收集任何东西。由于问题发生在连接期间,最可能的解释是执行计划使用广播连接。在这种情况下,Spark 会先收集数据,然后再广播它。

取决于配置和管道:

确保spark.sql.autoBroadcastJoinThreshold 小于spark.driver.maxResultSize。 确保您不要force broadcast join 处理未知大小的数据。 虽然没有任何迹象表明这是这里的问题,但在使用 Spark ML 实用程序时要小心。其中一些(最显着的索引器)可以为驱动程序带来大量数据。

要确定广播是否确实是问题,请检查执行计划,如果需要,删除广播提示并禁用自动广播:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

【讨论】:

我想 Spark 应该足够聪明,不会使用超过 maxResultSize 的广播连接,除非你明确告诉它,或者除非你愚蠢地将 autoBroadcastJoinThreshold 更改为更高的值? 如何禁用自动广播? @thentangler 设置 spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) 禁用自动广播【参考方案2】:

理论上,异常并不总是与客户数据相关。

关于任务执行结果的技术信息以序列化的形式发送到驱动节点,这些信息可能会占用比阈值更多的内存。

证明: 错误消息位于 org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults

val msg = s"Total size of serialized results of $calculatedTasks tasks " +

在 org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask​​ 中调用的方法

        val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match 
        case directResult: DirectTaskResult[_] =>
          if (!taskSetManager.canFetchMoreResults(serializedData.limit())) 
            return
          

如果任务数量很大,可能会发生上述异常。

【讨论】:

我很确定您正在做某事,但您能否详细说明该技术信息是什么以及该段代码如何证明它?我对 Spark 的内部结构不是很熟悉,所以对我来说,TaskResultGetter 的目的似乎是将任务执行给驱动程序的计算的实际结果返回。这只对collecttake、也许count、...等操作是必需的。 如何检查:在本地模式下只运行small join debug,在指定位置设置断点,出现一些技术信息(至少是一些累加器)

以上是关于超过 `spark.driver.maxResultSize` 而没有给驱动程序带来任何数据的主要内容,如果未能解决你的问题,请参考以下文章

什么更快:SUM 超过 NULL 还是超过 0?

当一个 pod 资源限制没有超过,而单个容器的资源限制超过了,会发生啥?

确定 95% 的请求所用时间不超过 1 秒,如果超过则自动停止测试

内容不超过屏幕,footer固定在底部,超过时被撑出屏幕

c++题目:求出现次数超过一半的数

SQL Server 2005:哪个更快?条件超过 2 列或超过 2 行?