将大型 Spark 数据帧作为镶木地板写入 s3 存储桶
Posted
技术标签:
【中文标题】将大型 Spark 数据帧作为镶木地板写入 s3 存储桶【英文标题】:Writing large spark data frame as parquet to s3 bucket 【发布时间】:2021-01-21 17:32:42 【问题描述】:我的情景
我在 AWS 粘合作业中有一个包含 400 万条记录的 spark 数据框 我需要在 AWS s3 中将其编写为 SINGLE parquet 文件当前代码
file_spark_df.write.parquet("s3://"+target_bucket_name)
问题 上面的代码创建了 100+ 个文件,每个文件的大小在 17.8 到 18.1 MB 之间,猜测它的一些默认分解大小
问题 1:如何只创建一个文件?对于一个火花数据框? 我检查了https://spark.apache.org/docs/latest/sql-data-sources-parquet.html 没有找到要设置的参数
问题 2:如何指定文件名 我试过了……
file_df.write.parquet("s3://"+target_bucket_name+"/"+target_file_name)
它在 "s3://"+target_bucket_name+"/"+target_file_name 中创建了 100 多个文件
问题 3:如何指定文件名 我需要在 base3 存储桶中创建子文件夹,下面的代码可以完成这项工作
file_df.write.parquet("s3://"+target_bucket_name+"/"+today_date+"/"+target_file_name)
不确定这是最好的方法……还是有更好的方法?
【问题讨论】:
使用coalesce(1)
写入一个文件:file_spark_df.coalesce(1).write.parquet("s3_path")
.
感谢这有效... 2,3 上的任何输入?我还需要检查文件夹是否已经存在...如果是,请先删除然后写....您可以发表您的评论作为答案,以便我可以接受它
要指定输出文件名,您必须重命名 Spark 编写的 part*
文件。例如写入临时文件夹、列出零件文件、重命名并移动到目标。你可以看到我的另一个answer。
【参考方案1】:
使用.repartition(1)
或如@blackbishop 所说,coalesce(1) 表示“我只想要输出上的一个分区”
它应该看起来像这样
val dest = "s3://"+target_bucket_name + "/subdir"
val destPath = newPath(dest)
val fs = Filesystem.get(destPath, conf) // where conf is the hadoop conf from your spark conf
fs.delete(destPath, true)
file_spark_df.parquet.repartition(1).write.(dest)
// at this point there should be only one file in the dest dir
val files = fs.listStatus(destPath) // array of fileStatus of size == 1
if (fs.size != 1) throw new IOException("Wrong number of files in " + destPath)
fs.rename(files[0].getPath(), new Path(destPath, "final-filename.parquet")
(注意,代码是@控制台编写的,没有编译、测试等。不过你应该明白)
【讨论】:
with repartition(1) 我得到 AttributeError: 'NoneType' object has no attribute 'repartition 什么是文件系统?我需要导入一些东西吗?有没有办法可以指定目标文件名...我不想要随机文件名 org.apache.hadoop.fs.FileSystem.以上是关于将大型 Spark 数据帧作为镶木地板写入 s3 存储桶的主要内容,如果未能解决你的问题,请参考以下文章
如何在 python 的 S3 中从 Pandas 数据帧写入镶木地板文件
Apache Spark 数据帧在写入镶木地板时不会重新分区