超过 `spark.driver.maxResultSize` 而没有给驱动程序带来任何数据
Posted
技术标签:
【中文标题】超过 `spark.driver.maxResultSize` 而没有给驱动程序带来任何数据【英文标题】:Exceeding `spark.driver.maxResultSize` without bringing any data to the driver 【发布时间】:2017-03-13 22:49:10 【问题描述】:我有一个执行大型连接的 Spark 应用程序
val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
然后将生成的 DataFrame 聚合为一个可能有 13k 行的数据帧。在加入过程中,作业失败并显示以下错误消息:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)
这发生在之前没有设置spark.driver.maxResultSize
,所以我设置了spark.driver.maxResultSize=2G
。然后,我对连接条件稍作更改,错误再次出现。
编辑:在调整集群大小时,我还将 .coalesce(256)
中的 DataFrame 假定的分区数量翻了一番,达到了 .coalesce(512)
,所以我不能确定不是因为这个.
我的问题是,既然我没有向司机收集任何东西,为什么spark.driver.maxResultSize
在这里很重要?驱动程序的内存是否用于我不知道的连接中的某些内容?
【问题讨论】:
遇到同样的问题,请问您有什么进展吗? @user4601931 你能粘贴你正在运行的实际 scala 代码吗?val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
行不会运行任何作业。您必须进行一些触发工作的转换。
您能检查一下joined
中有多少个分区吗?像joined.queryExecution.toRdd.getNumPartitions
这样的东西。我很好奇你为什么有78021 tasks
。是否更好的解决方案是减少连接中数据集的分区数量?
@JacekLaskowski 不幸的是,我已经没有这个项目的代码了,而且时间太长了,我已经忘记了它的大部分内容。抱歉,但感谢您对这个问题再次感兴趣。
@JacekLaskowski 我无法在此处显示查询计划,但它崩溃的阶段包含 +3000 个任务。很多 FileScanRDD
后跟 MapPartitionsRDD
。然后很多UnionRDD
。最后对所有联合的结果进行不同的操作。但是没有(广播)加入或收集......我当然可以看到为什么这个执行计划不理想,但不是spark.driver.maxResultSize
进来的地方。当--deploy-mode cluster
设置时没有崩溃。
【参考方案1】:
仅仅因为您没有明确收集任何东西并不意味着没有收集任何东西。由于问题发生在连接期间,最可能的解释是执行计划使用广播连接。在这种情况下,Spark 会先收集数据,然后再广播它。
取决于配置和管道:
确保spark.sql.autoBroadcastJoinThreshold
小于spark.driver.maxResultSize
。
确保您不要force broadcast join 处理未知大小的数据。
虽然没有任何迹象表明这是这里的问题,但在使用 Spark ML 实用程序时要小心。其中一些(最显着的索引器)可以为驱动程序带来大量数据。
要确定广播是否确实是问题,请检查执行计划,如果需要,删除广播提示并禁用自动广播:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
【讨论】:
我想 Spark 应该足够聪明,不会使用超过maxResultSize
的广播连接,除非你明确告诉它,或者除非你愚蠢地将 autoBroadcastJoinThreshold
更改为更高的值?
如何禁用自动广播?
@thentangler 设置 spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
禁用自动广播【参考方案2】:
理论上,异常并不总是与客户数据相关。
关于任务执行结果的技术信息以序列化的形式发送到驱动节点,这些信息可能会占用比阈值更多的内存。
证明: 错误消息位于 org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults
val msg = s"Total size of serialized results of $calculatedTasks tasks " +
在 org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask 中调用的方法
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit()))
return
如果任务数量很大,可能会发生上述异常。
【讨论】:
我很确定您正在做某事,但您能否详细说明该技术信息是什么以及该段代码如何证明它?我对 Spark 的内部结构不是很熟悉,所以对我来说,TaskResultGetter
的目的似乎是将任务执行给驱动程序的计算的实际结果返回。这只对collect
、take
、也许count
、...等操作是必需的。
如何检查:在本地模式下只运行small join debug,在指定位置设置断点,出现一些技术信息(至少是一些累加器)以上是关于超过 `spark.driver.maxResultSize` 而没有给驱动程序带来任何数据的主要内容,如果未能解决你的问题,请参考以下文章
当一个 pod 资源限制没有超过,而单个容器的资源限制超过了,会发生啥?