将文件从一个 parquet 分区移动到另一个

Posted

技术标签:

【中文标题】将文件从一个 parquet 分区移动到另一个【英文标题】:Moving files from one parquet partition to another 【发布时间】:2020-09-17 04:00:52 【问题描述】:

我的 S3 存储桶中有大量数据,由 MODULEDATE 两列分区 这样我的镶木地板的文件结构是:

s3://my_bucket/path/file.parquet/MODULE='XYZ'/DATE=2020-01-01

我有 7 个MODULEDATE 的范围从 2020-01-012020-09-01。 我发现数据存在差异,需要更正其中一个模块的 MODULE 条目。基本上我需要将属于MODULEXYZ 的特定索引号的所有数据更改为MODULEABC。 我可以在 pyspark 中通过加载数据框并执行以下操作来做到这一点:

df=df.withColumn('MODULE', when(col('index')==34, "ABC").otherwise(col('MODULE')))

但是我该如何重新分区,以便只有那些被更改的条目才能移动到ABC MODULE 分区?如果我这样做:

df.mode('append').partitionBy('MODULE','DATE').parquet(s3://my_bucket/path/file.parquet")

我会将数据与错误的MODULE 数据一起添加。另外,我有将近一年的数据,不想重新分区整个数据集,因为这需要很长时间。

有没有办法做到这一点?

【问题讨论】:

您可以使用input_file_name 函数来识别您需要更改的文件,因此,您可以只覆盖这些特定文件而不是整个数据。这只是一个想法。 【参考方案1】:

如果我理解得很好,您应该将分区 MODULE=XYZ 中的数据移动到 MODULE=ABC

首先,确定受影响的文件。

from pyspark.sql import functions as F

file_list = df.where(F.col("index") == 34).select(
    F.input_file_name()
).distinct().collect()

然后,您仅基于这些文件创建一个数据框,并使用它来完成MODULE

df = spark.read.parquet(file_list).withColumn(
    "MODULE", when(col("index") == 34, "ABC").otherwise(col("MODULE"))
)

df.write.parquet(
    "s3://my_bucket/path/ABC/", mode="append", partitionBy=["MODULE", "DATE"]
)

此时 ABC 应该没问题(你刚刚添加了缺失的数据),但是 XYZ 应该是错误的,因为重复的数据。要恢复XYZ,只需删除file_list中的文件列表即可。

【讨论】:

谢谢!我不知道input_file_name()。当您说删除file_list 中的文件列表时,您是指在 S3 控制台还是 CLI 中手动删除?还是以编程方式?我将如何以编程方式执行此操作? @thentangler 对我来说,整个脚本都是手动的。因此,您可以使用 S3 python 库。或者提取文件上的路径列表,并创建一个删除脚本以粘贴到 S3 控制台或 CLI 中【参考方案2】:

IIUC 您可以通过过滤该特定索引的数据然后将该数据与日期一起保存为分区来做到这一点。

df=df.withColumn('MODULE', when(col('index')==34, "ABC").otherwise(col('MODULE')))
df = df.filter(col('index')==34)
df.mode('overwrite').partitionBy('DATE').parquet(s3://my_bucket/path/ABC/")

通过这种方式,您最终只会修改已更改的模块,即 ABC

【讨论】:

感谢您提供此解决方案。所以我需要删除模块XYZ 中的数据,对吗?与上面 Steven 提供的解决方案一样。我可以通过编程方式做到这一点吗?

以上是关于将文件从一个 parquet 分区移动到另一个的主要内容,如果未能解决你的问题,请参考以下文章

使用分区的雪花到 Hive 数据移动

spark sql 无法在 S3 中查询镶木地板分区

我如何将每个Parquet行组读入一个单独的分区?

Azure Blob (pyarrow) 上的分区 Parquet 文件

oracle 11g 如何将表从一个表空间移动到另一个表空间

如何让 Spark 使用 Parquet 文件中的分区信息?