在 S3 中将每个分区数据写入单个文件中

Posted

技术标签:

【中文标题】在 S3 中将每个分区数据写入单个文件中【英文标题】:Write each partition data in a single file in S3 【发布时间】:2018-03-14 14:55:53 【问题描述】:

我们有一个用例,我们希望按列值对数据框进行分区,然后将每个分区写入单个文件。我做了以下事情来做同样的事情:

val df = spark.read.format("csv").load("hdfs:///tmp/PartitionKeyedDataset.csv")

df.repartition($"_c1")

df.rdd.saveAsTextFile("s3://dfdf/test1234")

当我这样做时:

df.rdd.partitions.size 

我只得到 62 个分区。但是,该列的不同值是 10,214(通过运行 df.select("_c1").distinct.count 得到)

我不能使用:

df.write.partitionBy("_c1").save("s3://dfdf/test123")

因为这会在目标中创建具有分区名称的文件夹。我们不想要这个。我们只想转储文件。

【问题讨论】:

你只想要一个文件吗? 每个分区一个文件 【参考方案1】:

我犯了一个不使用新变量的愚蠢错误。因此,我看到了相同数量的分区。以下是更新后的代码:

val df = spark.read.format("csv").load("hdfs:///tmp/PartitionKeyedDataset.csv")

df.repartition($"_c1")

df.rdd.saveAsTextFile("s3://dfdf/test1234")

默认情况下,repartition 只会创建 200 个分区,因为 spark.sql.shuffle.partitions 的默认值为 200。我已将此值设置为我想要分区的列的唯一值的数量。

spark.conf.set("spark.sql.shuffle.partitions", "10214")

在这之后,我得到了 10214 个分区,写操作在 S3 中创建了 10214 个文件。

【讨论】:

【参考方案2】:

您需要将新数据框分配给一个变量并使用它。目前在您的代码中,repartition 部分实际上并没有做任何事情。

val df = spark.read.format("csv").load("hdfs:///tmp/PartitionKeyedDataset.csv")
val df2 = df.repartition($"_c1")
df2.rdd.saveAsTextFile("s3://dfdf/test1234")

虽然可以更改spark.sql.shuffle.partitions 设置,但不够灵活。

【讨论】:

以上是关于在 S3 中将每个分区数据写入单个文件中的主要内容,如果未能解决你的问题,请参考以下文章

数据框无法在 S3 上写入

将 Dask 分区写入单个文件

s3 存储桶中的分区数据格式

在 s3 pyspark 作业中创建单个镶木地板文件

胶水作业无法写入文件

awswrangler 将镶木地板数据帧写入单个文件