用修改后的 PySpark DataFrame 覆盖现有 Parquet 数据集

Posted

技术标签:

【中文标题】用修改后的 PySpark DataFrame 覆盖现有 Parquet 数据集【英文标题】:overwrite existing Parquet dataset with modified PySpark DataFrame 【发布时间】:2021-12-03 06:33:48 【问题描述】:

用例是将列附加到 Parquet 数据集,然后在同一位置有效地重写。这是一个最小的例子。

创建一个pandas DataFrame 并作为分区 Parquet 数据集写入。

import pandas as pd
df = pd.DataFrame(
        'id': ['a','a','a','b','b','b','b','c','c'],
        'value': [0,1,2,3,4,5,6,7,8])
path = r'c:/data.parquet'
df.to_parquet(path=path, engine='pyarrow', compression='snappy', index=False, partition_cols=['id'], flavor='spark')

然后将 Parquet 数据集加载为 pyspark 视图,并将修改后的数据集创建为 pyspark DataFrame。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.parquet(path).createTempView('data')
sf = spark.sql(f"""SELECT id, value, 0 AS segment FROM data""")

此时sf 数据与df 数据相同,但有一个额外的全为零的segment 列。我想用sf 有效地覆盖path 的现有Parquet 数据集,作为同一位置的Parquet 数据集。下面是什么不起作用。也不愿将sf 写入新位置,删除旧的 Parquet 数据集,并重命名为似乎效率不高。

# saves existing data and new data
sf.write.partitionBy('id').mode('append').parquet(path)
# immediately deletes existing data then crashes
sf.write.partitionBy('id').mode('overwrite').parquet(path)

【问题讨论】:

这能回答你的问题吗? How to overwrite a parquet file from where DataFrame is being read in Spark 该问题没有公认的答案,只是建议重新编写整个数据集然后删除可能引入大量开销的原始数据集,或者加载到内存然后覆盖,这可能并不总是规模。 【参考方案1】:

我的简短回答:你不应该:\

大数据的一个原则(火花适用于大数据)是永远不要覆盖任何东西。当然,.mode('overwrite') 是存在的,但这不是正确的用法。

我对它为什么会(应该)失败的猜测:

您添加了一列,因此写入的数据集的格式与当前存储在那里的格式不同。这可能会造成架构混乱 您在处理时覆盖了输入数据。所以 spark 读取一些行,处理它们并覆盖输入文件。但这些文件仍然是其他行处理的输入。

在这种情况下,我通常做的是创建另一个数据集,当没有理由保留旧数据集时(即处理完全完成时),清理它。要删除文件,您可以查看this post on how to delete hdfs files。它应该适用于 spark 可访问的所有文件。但是它在 scala 中,所以我不确定它是否可以适应 pyspark。

请注意,效率不是重写的好理由,它所做的工作更多 简单的写。

【讨论】:

以上是关于用修改后的 PySpark DataFrame 覆盖现有 Parquet 数据集的主要内容,如果未能解决你的问题,请参考以下文章

用修改后的 .copy() 行替换原始 DataFrame 的行:将 .copy() 结果与原始 DataFrame 合并

将小 PySpark DataFrame 写入镶木地板时出现内存错误

如何在 PySpark 中为一个组迭代 Dataframe / RDD 的每一行。?

在 Pyspark 中合并 DataFrame

用列表 Pyspark Dataframe 中的值替换 NA

PySpark - 如何使用连接更新 Dataframe?