Spark - 写入 128 MB 大小的 parquet 文件
Posted
技术标签:
【中文标题】Spark - 写入 128 MB 大小的 parquet 文件【英文标题】:Spark- write 128 MB size parquet files 【发布时间】:2021-05-05 15:18:48 【问题描述】:我有一个超过 10 亿行的 DataFrame (df)
df.coalesce(5)
.write
.partitionBy("Country", "Date")
.mode("append")
.parquet(datalake_output_path)
根据上述命令,我了解到我的 100 个工作节点集群(spark 2.4.5)中只有 5 个工作节点将执行所有任务。使用 coalesce(5) 需要 7 小时才能完成。
我应该尝试repartition
而不是coalesce
?
是否有更快/更有效的方法来写出 128 MB 大小的 parquet 文件,或者我是否需要先计算我的数据帧的大小以确定需要多少个分区。
例如,如果我的数据帧大小为 1 GB,spark.sql.files.maxPartitionBytes = 128MB,我应该先计算No. of partitions required as 1 GB/ 128 MB = approx(8)
,然后再执行 repartition(8) 或 coalesce(8) 吗?
这个想法是在撰写本文时最大化输出中 parquet 文件的大小,并且能够快速(更快)这样做。
【问题讨论】:
在您的情况下,128MB 的输出文件大小有什么意义,听起来好像这是您可以容忍的最大文件大小? 我只是想避免“太多小文件”的问题。属性“spark.sql.files.maxPartitionBytes”设置为 128MB,因此我希望分区文件尽可能接近 128MB。例如,我想要 10 个大小为 128 MB 的部分文件,而不是说 64 个大小为 20 MB 的部分文件我还注意到,即使“spark.sql.files.maxPartitionBytes”设置为 128MB,我看到的文件为 200MB , 400MB 在输出路径中。我期待 spark 将它们打包成 128MB 的文件,但这与另一个问题有关 【参考方案1】:如果您从较高的分区数到较低的分区数,则合并会更好。但是,如果在编写 df 之前,您的代码没有执行 shuffle ,那么 coalesce 将被推送到 DAG 中可能的最早点。 您可以做的是在 100 个分区或您认为合适的任何数字中处理您的 df,然后在编写您的 df 之前将其持久化。 然后使用合并将你的分区减少到 5 个并写入它。这可能会给您带来更好的性能
【讨论】:
如果我想要最低数量。每个分区的 parquet 文件数你认为这是个好主意吗:-df.repartition("Country", "Date") .write .partitionBy("Country", "Date") .mode("append") .parquet(datalake_output_path)
Repartitition 控制内存中的分区,而 partitionBy 控制磁盘上的分区。我想您应该指定重新分区中的分区数以及列来控制文件数【参考方案2】:
您可以通过持久化数据框df
来获取其大小 (dfSizeDiskMB
),然后检查 Web UI 上的“存储”选项卡,如 answer 所示。有了这些信息和对预期 Parquet 压缩率的估计,您就可以估计实现所需输出文件分区大小所需的分区数,例如
val targetOutputPartitionSizeMB = 128
val parquetCompressionRation = 0.1
val numOutputPartitions = dfSizeDiskMB * parquetCompressionRatio / targetOutputPartitionSizeMB
df.coalesce(numOutputPartitions).write.parquet(path)
请注意,spark.files.maxPartitionBytes
与此处无关:
读取文件时打包到单个分区的最大字节数。
(除非df
是在没有创建中间数据帧的情况下读取输入数据源的直接结果。df
的分区数更可能由spark.sql.shuffle.partitions
决定,即 Spark 要使用的分区数对于从连接和聚合创建的数据帧)。
我应该尝试重新分区而不是合并吗?
coalesce
通常更好,因为它可以避免与repartition
相关的随机播放,但请注意docs 中的警告,即根据您的用例可能会在上游阶段失去并行性。
【讨论】:
因此,如果我想在代码中执行此操作,则类似于val blockSize= 1024*1024*128 sc.hadoopConfiguration.setInt("dfs.blocksize", blockSize) sc.hadoopConfiguration.setInt("parquet.block.size",blockSize) df.write.option("parquet.block.size", 128 * 1024 * 1024)
以上是关于Spark - 写入 128 MB 大小的 parquet 文件的主要内容,如果未能解决你的问题,请参考以下文章