Spark Dataframe Write to CSV 在独立集群模式下创建 _temporary 目录文件
Posted
技术标签:
【中文标题】Spark Dataframe Write to CSV 在独立集群模式下创建 _temporary 目录文件【英文标题】:Spark Dataframe Write to CSV creates _temporary directory file in Standalone Cluster Mode 【发布时间】:2019-02-04 22:29:29 【问题描述】:我在一个有 2 个工作节点的集群中运行 spark job
!我正在使用下面的代码(spark java)将计算的数据帧作为 csv 保存到工作节点。
dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath);
我试图了解 spark 如何在每个工作节点上写入多个部分文件。
Run1) worker1
有 part files
和 SUCCESS
; worker2
有 _temporarty/task*/part*
每个任务都有部分文件运行。
Run2) worker1
有部分文件和_temporary
目录; worker2
有 multiple part files
谁能帮我理解为什么会出现这种行为?
1)我是否应该将outputDir/_temporary
中的记录与part files in outputDir
一起视为输出文件的一部分?
2) 作业运行后是否应该删除_temporary
目录并将part
文件移动到outputDir
?
3)为什么不能直接在输出目录下创建零件文件?
coalesce(1)
和 repartition(1)
不能作为选项,因为 outputDir 文件本身将在 500GB
附近
Spark 2.0.2. 2.1.3
和 Java 8, no HDFS
【问题讨论】:
您是否保存到分布式文件系统?还是直接在每个工作节点上?你可以看看这个问题,可能是相关的:***.com/questions/51603404/… 这只是Q 51603404; Spark 需要一个跨所有工作人员的通用文件系统,否则事情就会被破坏。以某种方式设置共享存储,即使只是 NFS 【参考方案1】:TL;DR 要使用基于文件系统的源正确写入(或读取)数据,您需要共享存储。
_temporary
目录是 Spark 使用的基本提交机制的一部分 - 数据首先写入临时目录,一旦所有任务完成,自动移动到最终目的地。你可以在Spark _temporary creation reason阅读更多关于这个过程的信息
要使此过程成功,您需要一个共享文件系统(HDFS、NFS 等)或等效的分布式存储(如 S3)。由于您没有,因此预计无法清除临时状态 - Saving dataframe to local file system results in empty results。
当某些执行程序与驱动程序位于同一位置并与驱动程序共享文件系统时,您观察到的行为(数据部分提交和部分未提交)可能会发生,从而为数据子集启用完全提交。
【讨论】:
【参考方案2】:经过分析,观察到我的 spark 作业使用的是默认的fileoutputcommitter version 1
。
然后我包含了使用fileoutputcommitter version 2
而不是version 1
的配置,并在AWS 的10 节点spark 独立集群中进行了测试。所有part-* files
都是在dataframe.write().option("header","false").mode(SaveMode.Overwrite).csv(outputDirPath)
指定的outputDirPath
下直接生成的
我们可以设置属性
通过在spark-submit command
中包含与--conf 'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2'
相同的内容
或使用 sparkContext javaSparkContext.hadoopConifiguration().set("mapreduce.fileoutputcommitter.algorithm.version","2")
设置属性
我了解spark docs 中所述的失败后果,但我达到了预期的结果!
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version,默认值为1 这 文件输出提交者算法版本,有效算法版本 number: 1 或 2。版本 2 可能有更好的性能,但版本 1 在某些情况下可以更好地处理故障,根据 MAPREDUCE-4815。
【讨论】:
【参考方案3】:多个部分文件基于您的数据框分区。写入的文件或数据的数量取决于您写出数据时 DataFrame 的分区数。默认情况下,每个数据分区写入一个文件。
您可以通过使用合并或重新分区来控制它。您可以减少或增加分区。
如果你将 1 合并,那么你不会在其中看到多个部分文件,但这会影响并行写入数据。
[outputDirPath = /tmp/multiple.csv]
dataframe
.coalesce(1)
.write.option("header","false")
.mode(SaveMode.Overwrite)
.csv(outputDirPath);
关于如何引用它的问题..
以下所有部分请参考/tmp/multiple.csv
。
/tmp/multiple.csv/part-00000.csv
/tmp/multiple.csv/part-00001.csv
/tmp/multiple.csv/part-00002.csv
/tmp/multiple.csv/part-00003.csv
【讨论】:
以上是关于Spark Dataframe Write to CSV 在独立集群模式下创建 _temporary 目录文件的主要内容,如果未能解决你的问题,请参考以下文章
Spark- How to concatenate DataFrame columns