以 parquet 格式保存数据帧会生成太多小文件
Posted
技术标签:
【中文标题】以 parquet 格式保存数据帧会生成太多小文件【英文标题】:Saving a dataframe in the parquet format generates too many small files 【发布时间】:2018-03-15 08:20:43 【问题描述】:使用 Spark Sql,我将镶木地板格式的 spark df 保存在配置单元表中。 问题是这个操作总是生成大约 200 个 3 MB 的 part-* 文件。在处理过程中如何控制输出文件的数量和分区的数量?据我所知,每个分区都会生成一个 part-* 文件。
我的工作流程是这样的:
-
我使用
spark.read.parquet("")
从 Hive 元存储中的表中读取了一个分区
然后我从这个 df 创建一个 TempView
然后我用纯 sql 做我需要的转换,它有很多连接:spark.sql("SQL QUERY")
然后我尝试对结果进行 repartition 或 coalesce,但这不起作用,因为 part-* 文件的数量将始终为 200
我用spark.sql(" insert overwrite partition() ... ")
保存我的df
我使用纯 sql 的原因是,如果尝试使用 Dataframe API 保存我的 df,我会失去列的顺序。
我在 CDH 11.2 中使用 Spark 2.2.0
如果我的问题中有一些非慈善机构,请告诉我。
UPDATE1: 示例源代码
conf = (SparkConf()
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.parquet.compression.codec","SNAPPY")
.set("spark.executor.cores", "3")
.set("spark.executor.instances", "10")
.set("spark.executor.memory", "20g")
)
sc = SparkContext(conf=conf)
spark = SparkSession \
.builder \
.config(conf=conf) \
.getOrCreate()
df=spark.read.parquet("/user/hive/warehouse/schema.db/tbl/year=2000/month=01/day=01").cache()
temp_table_name="tmp_bal2_nor_20000101"
df.createOrReplaceTempView(temp_table_name)
aaa=spark.sql(
""" select DISTINCT t.A,
t.lot,
t.of,
t.columns,
.
.,
from temp_table_name t
left join(
...
) tmp1 on ...
left join(
...
) tmp2 on ...
left join(
...
) tmp3 ...
left join(
...
) tmp4 on ...
left join(
...
) tmp5 on ...
left join(
...
) tmp6 on ...
order by t.A, t.lot """
)
aaa.coalesce(10).createOrReplaceTempView(temp_table_name)
spark.sql(""" insert overwrite table tbl1
PARTITION(year=2000, month=01, day=01)
select DISTINCT t.A,
t.lot,
t.of,
t.columns
from temp_table_name t
""")
spark.stop()
【问题讨论】:
你好@sdikby coalesce 应该可以工作。您需要显示作者的代码才能得出结论。可以发一些代码吗? @AlexandrosBiratsis 请检查我的更新。 【参考方案1】:尝试更改代码以在使用合并后保存 parquet 文件。如下:
aaa.coalesce(10).write.parquet("your_path")
否则,通常会忽略合并,因为在减少分区后,您正在使用 aaa 数据集创建 TempView。目前您有 200 个分区,因为 spark.sql.shuffle.partitions 设置为 200,这是 Spark SQL 的默认分区数。 coalesce 只是您的数据集的一部分,因此如果您想正确使用它,您需要直接通过您的数据集进行操作。
【讨论】:
如果我听从您的建议,那么我的数据将无法通过 hive 元存储访问(我试图通过 HUE 查看它们,但出现错误)。第二个问题是.write.parquet("path")
,列的顺序和我的目标表不一样
你可以保存你的数据,然后将其恢复到你的 TempView 作为替代
你的意思是在我进行转换之后,我合并我的 df 将它保存在一个临时目录中,然后重新读取它,最后我会 spark.sql("insert overwrite into ...") 吗?我会测试它,但有没有更“直接”的方法?
据我所知调用coalesce后直接创建TempView的方法,这里需要,没有
@sdikby 抱歉,我没有意识到您需要保留您的 spark sql 部分,因为我认为我们正在讨论是否应该用数据框替换它,我的错。因此,请不要保存任何数据框,而是建议更改 spark.sql.shuffle.partitions
以修改 spark sql 的默认分区号。您可以尝试将其设置为较低的数字以上是关于以 parquet 格式保存数据帧会生成太多小文件的主要内容,如果未能解决你的问题,请参考以下文章