读取 PySpark 中的所有分区 parquet 文件
Posted
技术标签:
【中文标题】读取 PySpark 中的所有分区 parquet 文件【英文标题】:Read all partitioned parquet files in PySpark 【发布时间】:2019-12-04 09:11:59 【问题描述】:我想加载存储在 S3 AWS 文件夹结构中的所有 parquet 文件。
文件夹结构如下:S3/bucket_name/folder_1/folder_2/folder_3/year=2019/month/day
我想要的是一次读取所有 parquet 文件,因此我希望 PySpark 读取 2019 年所有可用月份和日期的所有数据,然后将其存储在一个数据帧中(这样你就可以得到一个连接/联合的数据帧2019 年的所有日子)。
我被告知这些是分区文件(尽管我不确定)。
这在 PySpark 中是否可行?如果可以,如何实现?
当我尝试spark.read.parquet('S3/bucket_name/folder_1/folder_2/folder_3/year=2019')
有用。但是,当我想查看使用 spark.read.parquet('S3/bucket_name/folder_1/folder_2/folder_3/year=2019').show()
的 Spark 数据帧时
上面写着:
An error occurred while calling o934.showString.
: org.apache.spark.SparkException:
Job aborted due to stage failure: Task 0 in stage 36.0 failed 4 times,
most recent failure:
Lost task 0.3 in stage 36.0 (TID 718, executor 7):
java.lang.UnsupportedOperationException:
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:372)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我希望能够显示数据框。
【问题讨论】:
你试过spark.read.parquet('s3://bucket_name/folder_1/folder_2/folder_3/year=2019/')
吗?
是的,它给了我关于 basePath 的错误消息?我记不太清楚了。奇怪的是,如果我尝试spark.read.parquet('s3://bucket_name/folder_1/folder_2/folder_3/year=2019/month=1')
,它确实需要那个月的所有时间。奇怪吗?
是的,我正在使用 EMR。对 Java 版本一无所知,也不知道我在使用哪个版本
【参考方案1】:
请参考文档的“分区发现”部分: https://spark.apache.org/docs/2.3.1/sql-programming-guide.html#partition-discovery
【讨论】:
【参考方案2】:在 PySpark 中,您可以简单地执行以下操作:
from pyspark.sql.functions import col
(
spark.read
.parquet('S3/bucket_name/folder_1/folder_2/folder_3')
.filter(col('year') == 2019)
)
因此,您将指向文件夹的路径,将其分区为一些子文件夹,然后应用分区过滤器,该过滤器应仅从给定年份子文件夹中获取数据。
【讨论】:
An error occurred while calling o787.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 29.0 failed 4 times, most recent failure: Lost task 0.3 in stage 29.0
这是我得到的错误,当我为您的代码使用 .show()
运算符时
@Eren 您能否在您的问题中发布整个错误消息?
@Eren 看起来有点像数据类型不匹配。您是否使用模式来读取数据?以上是关于读取 PySpark 中的所有分区 parquet 文件的主要内容,如果未能解决你的问题,请参考以下文章
java - 如何在类似于pyspark的java分区中写入parquet文件?