如何使用 Pyspark 并行处理多个镶木地板文件?

Posted

技术标签:

【中文标题】如何使用 Pyspark 并行处理多个镶木地板文件?【英文标题】:How to process multiple parquet files in parallel using Pyspark? 【发布时间】:2020-01-03 20:32:26 【问题描述】:

我正在使用 Azure Databricks,而且我是 Pyspark 和大数据的新手。

这是我的问题:

我在 azure databricks 上的目录中有几个 parquet 文件。 我想将这些文件读入 pyspark 数据框并使用 drop duplicates 方法删除重复的行 - 一个 QA 检查。 然后我想在删除重复项后覆盖同一目录中的这些文件。

目前,我正在使用 for 循环来遍历目录中的每个 parquet 文件。然而,这是一种低效的做事方式。我想知道是否有一种方法可以并行处理这些镶木地板文件以节省计算时间。如果是这样,我需要如何更改我的代码。

代码如下:

for parquet_file_name in dir:
     df = spark.read.option("header", "true").option("inferschema", "false").parquet('/'.format(dir,parquet_file_name))
     df.dropDuplicates().write.mode('overwrite').parquet('/'.format(dir,parquet_file_name)

我们将不胜感激。

非常感谢。

【问题讨论】:

【参考方案1】:

与其在 for 循环中一次读取一个文件,不如像这样读取整个目录。

 df = spark.read \
    .option("header", "true") \
    .option("inferschema", "false").parquet(dir)


df.dropDuplicates().write.mode('overwrite').parquet(dir)

现在将按预期一次性读取所有数据。如果要更改写入的文件数,请在 .write 函数之前使用 coalesce 命令,如下所示:df.dropDuplicates().coalesce(4).write.mode('overwrite').parquet(dir)

【讨论】:

警告:Spark 无法将数据帧写入与其源相同的目录。 Spark 读取延迟,mode('overwrite') 会破坏数据帧所代表的底层数据。 @CharlieFlowers 我不认为您对 Spark 无法写入与源目录相同的目录是正确的。这可能不是一个好的做法,但上述功能按预期工作。 你确定吗?它可能取决于文件系统,但使用 Spark 2.4.3: >>> df = spark.read \ ... .option("header", "true") \ ... .option("inferschema", "false ").parquet("tst_parquet") >>> df.dropDuplicates().write.mode('overwrite').parquet("tst_parquet") 20/01/03 18:06:35 错误执行器:任务 0.0 中的异常在阶段 1.0 (TID 1) java.io.FileNotFoundException: File file:/Users/charlie/tst_parquet/part-00003-fcdf8911-53f0-4139-868b-81048e20b896-c000.snappy.parquet 可能不存在文件已更新。您可以显式地使 ... 无效 @CharlieFlowers 啊,我承认它可能不适用于从像 hdfs / dbfs 这样的文件系统中读取。我使用 S3 中的文件对其进行了测试,并且可以正常工作。我还使用 Spark 2.4.3 进行了测试。

以上是关于如何使用 Pyspark 并行处理多个镶木地板文件?的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中写入镶木地板的问题

pyspark 使用动态日期范围读取镶木地板文件分区数据

在 s3 pyspark 作业中创建单个镶木地板文件

我对镶木地板文件和 python 完全陌生,谁能告诉我如何在 pyspark 中读取带有标题的镶木地板文件

如何在读取前根据定义的模式读取 pyspark 中的镶木地板文件?

如何从 pyspark 数据框的模式属性(来自镶木地板文件)中获取特定字段名称的数据类型?