如何在 Spark 中对 DataFrame 进行分区和写入而不删除没有新数据的分区?
Posted
技术标签:
【中文标题】如何在 Spark 中对 DataFrame 进行分区和写入而不删除没有新数据的分区?【英文标题】:How to partition and write DataFrame in Spark without deleting partitions with no new data? 【发布时间】:2017-07-08 03:34:32 【问题描述】:我正在尝试使用 DataFrameWriter
以 Parquet 格式将 DataFrame
保存到 HDFS,并按三列值分区,如下所示:
dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
如this question 中所述,partitionBy
将删除path
中的全部现有分区层次结构,并将其替换为dataFrame
中的分区。由于特定日期的新增量数据会定期出现,因此我只想替换层次结构中dataFrame
拥有数据的分区,而其他分区保持不变。
为此,我似乎需要使用其完整路径单独保存每个分区,如下所示:
singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
但是,我无法理解将数据组织到单个分区 DataFrame
s 的最佳方法,以便我可以使用它们的完整路径将它们写出来。一个想法是这样的:
dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
但foreachPartition
是在Iterator[Row]
上运行的,这对于写入 Parquet 格式并不理想。
我还考虑使用select...distinct eventdate, hour, processtime
来获取分区列表,然后按每个分区过滤原始数据帧并将结果保存到它们的完整分区路径。但是不同的查询加上每个分区的过滤器似乎不是很有效,因为它会是很多过滤器/写入操作。
我希望有一种更简洁的方法来保留dataFrame
没有数据的现有分区?
感谢阅读。
Spark 版本:2.1
【问题讨论】:
看看这个方法***.com/a/50170392/4390959。 【参考方案1】:模式选项Append
有一个问题!
df.write.partitionBy("y","m","d")
.mode(SaveMode.Append)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName)
我已经测试并看到这将保留现有的分区文件。但是,这次的问题如下:如果您运行相同的代码两次(使用相同的数据),那么它将创建新的 parquet 文件,而不是替换现有的相同数据的文件(Spark 1.6)。因此,我们仍然可以使用Overwrite
来解决这个问题,而不是使用Append
。我们应该在分区级别覆盖,而不是在表级别覆盖。
df.write.mode(SaveMode.Overwrite)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day)
查看以下链接了解更多信息:
Overwrite specific partitions in spark dataframe write method
(我在 suriyanto 发表评论后更新了我的回复。Thnx。)
【讨论】:
您是否测试过两次写入相同数据时是否会替换旧分区?根据我的测试,它实际上在分区目录中创建了一个新的 parquet 文件,导致数据加倍。我在 Spark 2.2 上。 我有同样的问题,我不希望数据被复制。您是否克服了重复数据的问题? 看一看这个 SO-answer 说明Append
: ***.com/a/51020951/3757672 会出现这种行为【参考方案2】:
我知道这已经很老了。由于我看不到任何发布的解决方案,我将继续发布一个。这种方法假定您在要写入的目录上有一个配置单元表。
处理此问题的一种方法是从 dataFrame
创建一个临时视图,该视图应添加到表中,然后使用普通的类似 hive 的 insert overwrite table ...
命令:
dataFrame.createOrReplaceTempView("temp_view")
spark.sql("insert overwrite table table_name partition ('eventdate', 'hour', 'processtime')select * from temp_view")
它保留旧分区,同时(过度)写入新分区。
【讨论】:
这对我来说不太有效,但让我非常接近(在 spark 2.2 上)。如果要确保现有分区不被覆盖,则必须在 SQL 语句中静态指定分区的值,并添加 IF NOT EXISTS,如下所示:spark.sql("insert overwrite table table_name partition (col1='1', col2='2', ) IF NOT EXISTS select * from temp_view")
顺便说一句,我确实看到了这个其他线程:***.com/a/49691528/834644 特定于 2.3。虽然我看到另一位评论者说它不起作用。
2.3 覆盖特定分区肯定有效,我已经使用了一段时间。有关该功能的更多信息:issues.apache.org/jira/browse/SPARK-20236
@sethcall 提议的解决方案在 2.1 中运行良好,但在 2.2 中尚未检查。【参考方案3】:
这是一个老话题,但我遇到了同样的问题并找到了另一个解决方案,只需使用以下命令将分区覆盖模式设置为动态:
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
所以,我的 spark 会话是这样配置的:
spark = SparkSession.builder.appName('AppName').getOrCreate()
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
【讨论】:
仅供参考,将 partitionOverwriteMode 设置为“动态”以某种方式使我们集群上的整个写入过程非常缓慢(长 3 倍)。我们使用的是火花 2.4.0。不确定新版本是否已修复此问题。 这应该被标记为真正的解决方案。也许它更慢,但它可以满足 OP 的要求。 fyispark.conf.set('spark.sql.sources.partitionOverwriteMode', 'static')
原始模式
在 Databricks 9.1 LTS(包括 Apache Spark 3.1.2、Scala 2.12)上运行并且没有发现性能下降以上是关于如何在 Spark 中对 DataFrame 进行分区和写入而不删除没有新数据的分区?的主要内容,如果未能解决你的问题,请参考以下文章
是否可以在 Pyspark 中对 DataFrame 进行子类化?
我可以将 Pyspark RDD 用作 Pandas DataFrame 吗? Pyspark/spark 在数据分析中对 Pandas 的限制?