如何在写入hive orc表时合并spark中的小文件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在写入hive orc表时合并spark中的小文件相关的知识,希望对你有一定的参考价值。

我正在从s3读取csv文件并将其作为orc写入hive表。在写作时,它写的是很多小文件。我需要合并所有这些文件。我有以下属性集:

 spark.sql("SET hive.merge.sparkfiles = true")
 spark.sql("SET hive.merge.mapredfiles = true")
 spark.sql("SET hive.merge.mapfiles = true")
 spark.sql("set hive.merge.smallfiles.avgsize = 128000000")
 spark.sql("set hive.merge.size.per.task = 128000000")

除了这些配置,我尝试重新分区(1)和coalesce(1),它将合并到单个文件中,但它删除了hive表并再次创建它。

 masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>);

如果我使用Append模式而不是Overwrite,它会在每个分区下创建复制文件。

  masterFile.repartition(1).write.mode(SaveMode.Append).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>);

在这两种情况下,spark作业都会运行两次而在第二次执行时会失败

有没有什么方法可以使用Append模式重新分区/合并而不会在每个分区中重复部分文件?

答案
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>)

.orc()方法将数据写为文件而不是触摸元信息。所以它无法覆盖HIVE中的表格。

如果您想覆盖hive表中的数据使用方法.insertInto(hive_table_name),其中hive_table_name是HIVE中表的全名(schema + table_name)

根据你的例子

masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).insertInto(hiveTableName)

也可以用元数据信息覆盖数据。具有覆盖修饰符的方法.saveAsTable(hive_table_name)也将覆盖Metastore中的数据。

masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).saveAsTable(hiveTableName)

以上是关于如何在写入hive orc表时合并spark中的小文件的主要内容,如果未能解决你的问题,请参考以下文章

Spark Sql 从 Hive orc 分区表中读取,给出数组越界异常

如何合并 sparksql 保存在 hive 上的小文件?

将 Spark DataFrame 写入 Hive 表时的内存分配问题

查询Spark同时加载的hive表时如何避免错误

火花小兽人条纹

Spark小文件异步合并工具类