使用 pyspark 重新分区失败并出现错误

Posted

技术标签:

【中文标题】使用 pyspark 重新分区失败并出现错误【英文标题】:Re partitioning using pyspark failing with error 【发布时间】:2020-04-29 13:54:25 【问题描述】:

我在下面列的 s3 文件夹中有镶木地板。镶木地板的大小约为 40 mb。

org_id, device_id, channel_id, source, col1, col2

现在分区在 3 列 org_id device_id channel_id

我想把分区改成source, org_id, device_id, channel_id. 我正在使用 pyspark 从 s3 读取文件并写入 s3 存储桶。

sc = SparkContext(appName="parquet_ingestion1").getOrCreate()
spark = SparkSession(sc)
file_path = "s3://some-bucket/some_folder"
print("Reading parquet from s3:".format(file_path))
spark_df = spark.read.parquet(file_path)
print("Converting to parquet")
file_path_re = "s3://other_bucket/re-partition"
partition_columns = ["source", "org_id", "device_id", "channel_id "]

spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)

我收到错误,没有生成 parquet 文件。

spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
[Stage 1:>                                                        (0 + 8) / 224]20/04/29 13:29:44 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, ip-172-31-43-0.ap-south-1.compute.internal, executor 3): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

然后我尝试了

spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)


spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
[Stage 3:>                                                        (0 + 8) / 224]20/04/29 13:32:11 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 23, ip-172-31-42-4.ap-south-1.compute.internal, executor 5): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

[Stage 3:==>                                                      (8 + 8) / 224]20/04/29 13:32:22 WARN TaskSetManager: Lost task 0.2 in stage 3.0 (TID 40, ip-172-31-42-4.ap-south-1.compute.internal, executor 5): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)

在第二种情况下,它失败了,但它也在创建镶木地板。现在我不确定它是否正确地将所有数据创建到新分区。 让我知道重新划分镶木地板的正确方法是什么。

更新 1:

from pyspark.sql.types import StringType
for col1 in partition_columns:
    spark_df=spark_df.withColumn(col1, col(col1).cast(dataType=StringType()))             

都试过了 spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)

spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re) 我收到以下错误

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 20, ip-172-31-42-4.ap-south-1.compute.internal, executor 4): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
        at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
        at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)

更新 2:

现在我发现其中一列存在架构不匹配,一列是字符串,另一列是浮点数。我已经描述了下面的场景。 在这里您可以看到 col1 列是一行中的字符串,另一行是浮动的

org_id, device_id, channel_id, source,    col1, col2
"100"    "device1"  "channel"   "source1"  10    0.1
"100"    "device1"  "channel"   "source2"  "10"  0.1

我尝试将 col1 列转换为 float.it dodn;t 工作

任何建议。

【问题讨论】:

您可以尝试强制读取模式到您的数据框,将所有列保留为字符串,然后运行 ​​df.show(),因为其他列也可能出现错误? @ShubhamJain 更新了问题 【参考方案1】:

尝试强制类型将所有partition_columns 转换为StringType

【讨论】:

【参考方案2】:

UPDATE2 中提到了问题的根本原因。在我的情况下,我们有 4 个应用程序(基于源的不同管道的一部分)写入镶木地板商店。 2 应用程序 APP1 和 APP2 不使用 col1 和 APP 3 用于将其写为浮点数。 最近 APP4 开始在他们的数据中获取 col1 并将其作为字符串存储在 parquet.parquet 中不要在写作时抱怨。 在阅读这样的镶木地板时

    我尝试投射它没有成功 合并架构因数据类型不匹配而失败 我尝试根据源类型过滤数据。从某种意义上说,如果过滤掉 APP4 数据,它会部分起作用。但如果过滤掉 APP3 数据,它就不起作用。

这可能不是一个好的解决方案,但我现在不得不满足于此。

解决方案: 1.过滤掉app4源数据,创建数据框并转换成parquet,然后只过滤数据框中的app4源parquet,去掉col1,转换成parquet。

    或从整个数据帧中删除 col 并写入 parquet。

df1 =df.select([c for c in df.columns if c!= 'col1'])

【讨论】:

由于 parquet 包含数据类型信息,因此我们甚至无法在读取时明确将其全部设为字符串。如果可能的话,将两个源生成的镶木地板文件保存在两个不同的位置,一旦将它们作为两个数据框读取,您可以通过将列转换为一次读取来合并两个数据框,您可以轻松地进行转换。

以上是关于使用 pyspark 重新分区失败并出现错误的主要内容,如果未能解决你的问题,请参考以下文章

重新分区 pyspark 数据帧失败以及如何避免初始分区大小

Pyspark:重新分区与分区

Spark2.4.3 中方法不存在错误导致重新分区失败

PySpark 重新分区 RDD 元素

Pyspark Parquet - 重新分区后排序

使用 pyspark 对 parquet 文件进行分区和重新分区