以 parquet 格式保存数据帧会生成太多小文件

Posted

技术标签:

【中文标题】以 parquet 格式保存数据帧会生成太多小文件【英文标题】:Saving a dataframe in the parquet format generates too many small files 【发布时间】:2018-03-15 08:20:43 【问题描述】:

使用 Spark Sql,我将镶木地板格式的 spark df 保存在配置单元表中。 问题是这个操作总是生成大约 200 个 3 MB 的 part-* 文件。在处理过程中如何控制输出文件的数量和分区的数量?据我所知,每个分区都会生成一个 part-* 文件。

我的工作流程是这样的:

    我使用spark.read.parquet("") 从 Hive 元存储中的表中读取了一个分区 然后我从这个 df 创建一个 TempView 然后我用纯 sql 做我需要的转换,它有很多连接:spark.sql("SQL QUERY") 然后我尝试对结果进行 repartitioncoalesce,但这不起作用,因为 part-* 文件的数量将始终为 200 我用spark.sql(" insert overwrite partition() ... ")保存我的df

我使用纯 sql 的原因是,如果尝试使用 Dataframe API 保存我的 df,我会失去列的顺序。

我在 CDH 11.2 中使用 Spark 2.2.0

如果我的问题中有一些非慈善机构,请告诉我。

UPDATE1: 示例源代码

conf = (SparkConf()
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.parquet.compression.codec","SNAPPY")
.set("spark.executor.cores", "3")
.set("spark.executor.instances", "10")
.set("spark.executor.memory", "20g")
)

sc = SparkContext(conf=conf)

spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate()

df=spark.read.parquet("/user/hive/warehouse/schema.db/tbl/year=2000/month=01/day=01").cache()

temp_table_name="tmp_bal2_nor_20000101"
df.createOrReplaceTempView(temp_table_name)

aaa=spark.sql(
""" select DISTINCT t.A,
                    t.lot,
                    t.of,
                    t.columns,
                    .
                    .,
    from temp_table_name t 
left join( 
    ...
) tmp1 on ...
left join( 
    ...
) tmp2 on ...
left join( 
    ...
) tmp3 ...
left join( 
    ...
) tmp4 on ...
left join( 
    ...
) tmp5 on ...
left join( 
    ...
) tmp6 on ...
order by t.A, t.lot """
)

aaa.coalesce(10).createOrReplaceTempView(temp_table_name)
spark.sql(""" insert overwrite table tbl1
PARTITION(year=2000, month=01, day=01)
select DISTINCT t.A, 
                 t.lot, 
                 t.of, 
                 t.columns
from temp_table_name t
""")

spark.stop()

【问题讨论】:

你好@sdikby coalesce 应该可以工作。您需要显示作者的代码才能得出结论。可以发一些代码吗? @AlexandrosBiratsis 请检查我的更新。 【参考方案1】:

尝试更改代码以在使用合并后保存 parquet 文件。如下:

 aaa.coalesce(10).write.parquet("your_path")

否则,通常会忽略合并,因为在减少分区后,您正在使用 aaa 数据集创建 TempView。目前您有 200 个分区,因为 spark.sql.shuffle.partitions 设置为 200,这是 Spark SQL 的默认分区数。 coalesce 只是您的数据集的一部分,因此如果您想正确使用它,您需要直接通过您的数据集进行操作。

【讨论】:

如果我听从您的建议,那么我的数据将无法通过 hive 元存储访问(我试图通过 HUE 查看它们,但出现错误)。第二个问题是.write.parquet("path"),列的顺序和我的目标表不一样 你可以保存你的数据,然后将其恢复到你的 TempView 作为替代 你的意思是在我进行转换之后,我合并我的 df 将它保存在一个临时目录中,然后重新读取它,最后我会 spark.sql("insert overwrite into ...") 吗?我会测试它,但有没有更“直接”的方法? 据我所知调用coalesce后直接创建TempView的方法,这里需要,没有 @sdikby 抱歉,我没有意识到您需要保留您的 spark sql 部分,因为我认为我们正在讨论是否应该用数据框替换它,我的错。因此,请不要保存任何数据框,而是建议更改 spark.sql.shuffle.partitions 以修改 spark sql 的默认分区号。您可以尝试将其设置为较低的数字

以上是关于以 parquet 格式保存数据帧会生成太多小文件的主要内容,如果未能解决你的问题,请参考以下文章

从多个火花工人以镶木地板格式保存

使用 parquet 格式时是不是保存了 DataFrame 架构?

使用 hive 生成​​ Parquet 文件

大数据:Parquet文件存储格式

大数据:Parquet文件存储格式

hudi使用cow生成parquet格式用hive查询的问题