仅从 Spark Scala DataFrame 写入标头 CSV 记录

Posted

技术标签:

【中文标题】仅从 Spark Scala DataFrame 写入标头 CSV 记录【英文标题】:Write Header only CSV record from Spark Scala DataFrame 【发布时间】:2018-06-07 14:02:33 【问题描述】:

我的要求是使用 Spark Scala DataFrame 仅写入 Header CSV 记录。谁能帮我解决这个问题。

val OHead1 = "/xxxxx/xxxx/xxxx/xxx/OHead1/" 
val sc = sparkFile.sparkContext
val outDF = csvDF.select("col_01", "col_02", "col_03").schema
sc.parallelize(Seq(outDF.fieldNames.mkString("\t"))).coalesce(1).saveAsTextFile(s"$OHead1")

The above one is working and able to create header in the CSV with tab delimiter. Since I am using spark session I am creating sparkContext in the second line. outDF is my dataframe created before these statements.
Two things are outstanding, can you one of you help me.

1. The above working code is not overriding the files, so every time I need to delete the files manually. I could not find override option, can you help me.
2. Since I am doing a select statement and schema, will it be consider as action and start another lineage for this statement. If it is true then this would degrade the performance.

【问题讨论】:

sc.parallelize(Seq(df.columns.mkString(","))).saveAsTextFile 我正在使用 sparksession,因此并行化时出错不是成员。我从数据框中只选择了 10 列中的 3 列作为输出。 【参考方案1】:

如果您只需要输出标题,您可以使用以下代码:

df.schema.fieldNames.reduce(_ + "," + _)

它将使用列名创建一行 CSV

【讨论】:

我可以打印这个输出。我希望将其写入 CSV 文件。由于此字符串找不到写入文件的选项。此外,我从数据框中仅选择 10 列中的 3 列作为输出。 如果你 select 只有 3 列 - 你不需要从数据框模式中获取它。您可以将这些名称写入文件中。 Here 是如何写入文本文件的示例。【参考方案2】:
I tested and the solution below did not affect any performance.

val OHead1 = "/xxxxx/xxxx/xxxx/xxx/OHead1/" 
val sc = sparkFile.sparkContext
val outDF = csvDF.select("col_01", "col_02", "col_03").schema
sc.parallelize(Seq(outDF.fieldNames.mkString("\t"))).coalesce(1).saveAsTextFile(s"$OHead1")

【讨论】:

【参考方案3】:

我找到了处理这种情况的解决方案。在配置文件中定义列并将这些列写入文件中。这是片段。

val Header = prop.getProperty("OUT_HEADER_COLUMNS").replaceAll("\"","").replaceAll(",","\t")
scala.tools.nsc.io.File(s"$HeadOPath").writeAll(s"$Header")

【讨论】:

两种解决方案都有效。我会推荐第一个解决方案(从架构中选择列),并且不要从属性文件中硬编码值。

以上是关于仅从 Spark Scala DataFrame 写入标头 CSV 记录的主要内容,如果未能解决你的问题,请参考以下文章

Spark (Scala) - 在 DataFrame 中恢复爆炸

scala spark dataframe 修改字段类型

spark dataframe 和 scala Map互相转换

Spark-Scala:使用异常处理将固定宽度线解析为 Dataframe Api

Spark/Scala:对带有数组类型列的 DataFrame 中的某些组件的操作

使用Scala在Spark中创建DataFrame时出错