读取 PySpark 中的所有分区 parquet 文件

Posted

技术标签:

【中文标题】读取 PySpark 中的所有分区 parquet 文件【英文标题】:Read all partitioned parquet files in PySpark 【发布时间】:2019-12-04 09:11:59 【问题描述】:

我想加载存储在 S3 AWS 文件夹结构中的所有 parquet 文件。

文件夹结构如下:S3/bucket_name/folder_1/folder_2/folder_3/year=2019/month/day

我想要的是一次读取所有 parquet 文件,因此我希望 PySpark 读取 2019 年所有可用月份和日期的所有数据,然后将其存储在一个数据帧中(这样你就可以得到一个连接/联合的数据帧2019 年的所有日子)。

我被告知这些是分区文件(尽管我不确定)。

这在 PySpark 中是否可行?如果可以,如何实现?

当我尝试spark.read.parquet('S3/bucket_name/folder_1/folder_2/folder_3/year=2019') 有用。但是,当我想查看使用 spark.read.parquet('S3/bucket_name/folder_1/folder_2/folder_3/year=2019').show() 的 Spark 数据帧时

上面写着:

An error occurred while calling o934.showString.
: org.apache.spark.SparkException: 
Job aborted due to stage failure: Task 0 in stage 36.0 failed 4 times, 
most recent failure: 
Lost task 0.3 in stage 36.0 (TID 718, executor 7): 
java.lang.UnsupportedOperationException: 
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary 
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:372)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我希望能够显示数据框。

【问题讨论】:

你试过spark.read.parquet('s3://bucket_name/folder_1/folder_2/folder_3/year=2019/')吗? 是的,它给了我关于 basePath 的错误消息?我记不太清楚了。奇怪的是,如果我尝试spark.read.parquet('s3://bucket_name/folder_1/folder_2/folder_3/year=2019/month=1'),它确实需要那个月的所有时间。奇怪吗? 是的,我正在使用 EMR。对 Java 版本一无所知,也不知道我在使用哪个版本 【参考方案1】:

请参考文档的“分区发现”部分: https://spark.apache.org/docs/2.3.1/sql-programming-guide.html#partition-discovery

【讨论】:

【参考方案2】:

在 PySpark 中,您可以简单地执行以下操作:

from pyspark.sql.functions import col
(
  spark.read
  .parquet('S3/bucket_name/folder_1/folder_2/folder_3')
  .filter(col('year') == 2019)
)

因此,您将指向文件夹的路径,将其分区为一些子文件夹,然后应用分区过滤器,该过滤器应仅从给定年份子文件夹中获取数据。

【讨论】:

An error occurred while calling o787.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 29.0 failed 4 times, most recent failure: Lost task 0.3 in stage 29.0 这是我得到的错误,当我为您的代码使用 .show() 运算符时 @Eren 您能否在您的问题中发布整个错误消息? @Eren 看起来有点像数据类型不匹配。您是否使用模式来读取数据?

以上是关于读取 PySpark 中的所有分区 parquet 文件的主要内容,如果未能解决你的问题,请参考以下文章

java - 如何在类似于pyspark的java分区中写入parquet文件?

使用 pyspark 对 parquet 文件进行分区和重新分区

pyspark 使用动态日期范围读取镶木地板文件分区数据

Pyspark Parquet - 重新分区后排序

使用 pyspark 同时编写 parquet 文件

Hive 不读取 Spark 生成的分区 parquet 文件