由于空间问题,Spark 作业失败

Posted

技术标签:

【中文标题】由于空间问题,Spark 作业失败【英文标题】:Spark job failing due to space issue 【发布时间】:2017-11-24 13:02:32 【问题描述】:

我正在使用 pyspark 在 Spark 中编写批处理程序。 以下是输入文件及其大小

base-track.dat (3.9g)
base-attribute-link.dat (18g)
base-release.dat (543m)

这些是每行一个记录的文本文件,每个字段由一个特殊字符分隔(参考代码)

我正在对属性链接执行一些过滤操作并将它们分组并与其他表连接。

我通过 spark-submit 将该程序提交到一个 Hadoop 集群,该集群有 9 个由 Ambari 管理的数据节点。 每个数据节点包含 140 GB 的 RAM 和 3.5 TB 的磁盘空间。

以下是我的 pyspark 代码

import sys

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

if __name__ == "__main__":
        sc = SparkContext(appName = "Tracks")
        sqlContext = SQLContext(sc)

        #Load base-track
        track = sc.textFile("base-track/input").map(lambda row: row.split(u'\u0001'))

        #Load base-attribute-link
        attlnk = sc.textFile("base-attribute-link/input").map(lambda row: row.split(u'\u0001'))

        #Load base-release
        release = sc.textFile("base-release/input").map(lambda row: row.split(u'\u0001'))

        attlnk = attlnk.filter(lambda row: row[2] == 'MA0000000162')

        attlnkg = attlnk.groupBy(lambda row: row[1])

        attlnkmax = attlnkg.map( lambda t: (t[0],max([v[4] for v in t[1]])) )

        alg = attlnkmax.map(lambda r: Row(al_objectid=r[0],al_value=r[1]))

        aldf = alg.toDF()

        track = track.map(lambda r:Row(t_tag = r[0], t_trackid= r[1], t_releaseid= r[2], t_songid = r[3], t_med= r[4], t_ph = r[5], t_tn = r[5], t_title= r[5], t_part= r[6], t_dur = r[7], t_pick = r[8], t_amgclid  = r[9], t_amgpopid = r[10], t_compid = r[11], t_muzid = r[12], t_perfid= r[13], t_albumid = r[14]))

        trackdf = track.toDF()

        release = release.map(lambda r:Row(r_tag = r[0], r_relid = r[1], r_albumid = r[2], r_mediafmtid = r[3], r_prodfmtid = r[4], r_reldate = r[5], r_prodcode = r[6], r_prodtypeid = r[7], r_label = r[8], r_relyear = r[9], r_ispurch = r[10], r_amgclassid = r[11], r_amgpopid = r[12], r_eanid = r[13], r_upcid = r[14]))

        releasedf = release.toDF()

        trackaldf = trackdf.join(aldf, trackdf['t_trackid'] == aldf['al_objectid'], 'left_outer')


        tracksdf = trackaldf.join(releasedf, trackaldf['t_releaseid'] == releasedf['r_relid'])

        tracksdf = tracksdf.select('t_trackid', 't_releaseid', 't_songid', 't_med', 't_ph', 't_tn', 't_title', 't_part', 't_dur', 't_pick', 't_amgclid', 't_amgpopid', 't_compid', 't_muzid', 'al_objectid', 't_perfid', 't_albumid', 'r_label')


        tracksdf.rdd.map(lambda x: u"\u0001".join(map(str, x))).coalesce(100).saveAsTextFile("tracks-out")

尝试执行此操作时出现以下一堆错误。

ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /tmp/blockmgr-d88c631e-cec3-4b83-8af6-a38b109b5e3b/0e/temp_shuffle_7dbda3ac-48b1-4c4a-89c7-64eb5d858d90
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:326)
    at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:336)
    at org.apache.spark.io.SnappyOutputStreamWrapper.flush(CompressionCodec.scala:209)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
    at java.io.DataOutputStream.flush(DataOutputStream.java:123)
    at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.flush(UnsafeRowSerializer.scala:83)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply$mcV$sp(DiskBlockObjectWriter.scala:157)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
    at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$revertPartialWritesAndClose$1.apply(DiskBlockObjectWriter.scala:154)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
    at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:161)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:232)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

有几个关于 SO、here 和 here 的问题与同一问题相关。

这是我从上述两个问题中尝试过的。 我尝试将 spark.yarn.executor.memoryOverhead 从 384 MB 增加到 4GB。

SPARK_JAVA_OPTS+=" -Dspark.local.dir=/mnt/spark,/mnt2/spark -Dhadoop.tmp.dir=/mnt/ephemeral-hdfs"

export SPARK_JAVA_OPTS

第一个没有任何效果。如果我添加 java opts,则会收到 /mnt 目录不存在的错误。

在多个论坛(包括databricks)上阅读了有关此问题的信息后,有些模糊的想法是该作业正在尝试创建临时文件作为每个集群节点的/tmp 上的shuffle 的一部分并耗尽空间。在每个集群节点上,我们为存在 tmp 目录的根 (/) 分区分配了 100 GB。

一个多月以来,我一直在努力通过使用各种 spark 配置参数来执行此操作。作为调整的一部分,我将 spark.driver 和 spark.executor 内存增加到 16g,后来增加到 64g。还将火花纱执行器内存增加到 4GB。不幸的是,这些都不能解决空间问题。

任何有关如何进一步进行的指导都会有很大帮助。

[Edit-1] 我只是在检查所有机器上根目录的磁盘空间,我们集群中的 9 个节点中有 7 个为根目录分配了 100+GB,但在 2 个节点上只分配了 10 GB,只剩下 6+GB。这可能会导致磁盘空间问题,如果可以扩展根目录的大小,我将不得不与我们的 IT 团队核实。

[Edit-2] 我与 IT 团队合作将所有机器上的根分区大小扩展到 100+GB,但问题仍然存在,可能是 100GB 的 /tmp 空间也不足以胜任这项工作。我估计这个作业的输出大约是 4.6GB。

【问题讨论】:

***.com/questions/43848019/… @BDR 我尝试将自己的临时目录定义到其他有 300 GB 空间的目录,但仍然是同样的问题 此问题即将发生或在一段时间内发生? 我猜这是在将输出存储到文本文件时发生的。在此之前它会运行几分钟 你能在运行期间在一个节点上“观察 df”,看看哪些 fs 实际被填满了吗? 【参考方案1】:

鉴于您的错误的性质以及您正在对数十 GB 的数据执行大型连接这一事实,其中 spark 工作人员将在其洗牌时将中间数据写入磁盘,因此 100 GB 的磁盘似乎还不够。我建议为默认的 worker_dir 和 local_dirs 分配更多的磁盘,方法是将它们挂载到更大的磁盘或配置更大的根磁盘。另外,请注意,如果 spark 未正确关闭,则此中间数据​​可能会持续存在并占用工作节点上的大量空间。因此,您可能必须检查这些目录并删除所有过时的文件。如果您在 AWS r3、c3 或具有大型临时 SSD 磁盘的类似实例类型上运行 spark-standalone,我建议 mounting 这些磁盘说“mnt”和“mnt2”,configuring spark 暂存空间指向那些挂载,而不是(通常)较小的根卷。例如:

SPARK_LOCAL_DIRS=/mnt
SPARK_WORKER_DIR=/mnt2

【讨论】:

所以这些 /mnt 和 /mnt2 目录应该是空的,只是为了暂存空间?或者我们可以使用共享目录吗? 我们的机器是使用 jbod 配置(/jbod1、/jbod2 等)设置的,它们有很多磁盘空间。但是当我将 SPARK_LOCAL_DIRS 和 SPARK_WORKER_DIR 指向这些目录时,我收到错误“无法在 /jbod1 中创建本地目录” 这些目录可以是您的 spark 集群有权写入的任何目录。我会确保运行您的 spark 集群的用户具有写入 /jbod1 的权限,并确保该用户没有达到您的系统管理员设置的任何配额。【参考方案2】:

我发现我没有将 spark 作业提交到集群,而是单台机器,因此存在磁盘空间问题。我总是通过以下方式提交我的脚本

spark-submit tracks.py

由于我希望我的脚本在 Hadoop 集群上执行并使用 Yarn 作为资源管理器,我将我的提交命令更改为以下,然后它工作正常。

spark-submit --master yarn --deploy-mode cluster tracks.py

【讨论】:

很高兴您自己解决了这个问题。 关于 SO 有多个与此类似的问题,但我的问题和解决方案是独一无二的。我希望这对像我这样的人有用。

以上是关于由于空间问题,Spark 作业失败的主要内容,如果未能解决你的问题,请参考以下文章

由于临时空间已满,作业失败

由于阶段失败,Pyspark 作业中止错误

Spark设备上没有剩余空间

为啥 Spark 作业失败并显示“退出代码:52”

Spark:作业重启和重试

Spark 流式传输作业在被驱动程序停止后失败