将 Spark 数据帧写入带分区的 CSV

Posted

技术标签:

【中文标题】将 Spark 数据帧写入带分区的 CSV【英文标题】:Write Spark dataframe as CSV with partitions 【发布时间】:2016-05-29 12:30:45 【问题描述】:

我正在尝试将 Spark 中的数据帧写入 HDFS 位置,我希望如果我添加 partitionBy 符号 Spark 将创建分区 (类似于 Parquet 格式的写法)

格式的文件夹
partition_column_name=partition_value

(即partition_date=2016-05-03)。为此,我运行了以下命令:

(df.write
    .partitionBy('partition_date')
    .mode('overwrite')
    .format("com.databricks.spark.csv")
    .save('/tmp/af_organic'))

但尚未创建分区文件夹 知道我该怎么做才能让 spark DF 自动创建这些文件夹吗?

谢谢,

【问题讨论】:

【参考方案1】:

Spark 2.0.0+

内置的 csv 格式支持开箱即用的分区,因此您应该能够简单地使用:

df.write.partitionBy('partition_date').mode(mode).format("csv").save(path)

不包括任何额外的软件包

火花:

目前 (v1.4.0) spark-csv 不支持 partitionBy(请参阅 databricks/spark-csv#123),但您可以调整内置源以实现您想要的。

您可以尝试两种不同的方法。假设您的数据相对简单(没有复杂的字符串并且需要字符转义)并且看起来或多或少像这样:

df = sc.parallelize([
    ("foo", 1, 2.0, 4.0), ("bar", -1, 3.5, -0.1)
]).toDF(["k", "x1", "x2", "x3"])

您可以手动准备要写入的值:

from pyspark.sql.functions import col, concat_ws

key = col("k")
values = concat_ws(",", *[col(x) for x in df.columns[1:]])

kvs = df.select(key, values)

并使用text source 编写代码

kvs.write.partitionBy("k").text("/tmp/foo")

df_foo = (sqlContext.read.format("com.databricks.spark.csv")
    .options(inferSchema="true")
    .load("/tmp/foo/k=foo"))

df_foo.printSchema()
## root
## |-- C0: integer (nullable = true)
## |-- C1: double (nullable = true)
## |-- C2: double (nullable = true)

在更复杂的情况下,您可以尝试使用适当的 CSV 解析器以类似的方式预处理值,方法是使用 UDF 或映射到 RDD,但成本会高得多。

如果 CSV 格式不是硬性要求,您还可以使用支持 partitionBy 开箱即用的 JSON 编写器:

df.write.partitionBy("k").json("/tmp/bar")

以及在读取时发现分区。

【讨论】:

以上是关于将 Spark 数据帧写入带分区的 CSV的主要内容,如果未能解决你的问题,请参考以下文章

Spark Job 卡住了将数据帧写入分区 Delta 表

将大型 Spark 数据帧从数据块写入 csv 失败

PySpark:如何将具有 SparseVector 类型的列的 Spark 数据帧写入 CSV 文件?

将 PySpark 数据帧写入分区 Hive 表

Apache Spark 数据帧在写入镶木地板时不会重新分区

将火花数据帧写入固定宽度文件java spark