使用 pyspark 重新分区失败并出现错误
Posted
技术标签:
【中文标题】使用 pyspark 重新分区失败并出现错误【英文标题】:Re partitioning using pyspark failing with error 【发布时间】:2020-04-29 13:54:25 【问题描述】:我在下面列的 s3 文件夹中有镶木地板。镶木地板的大小约为 40 mb。
org_id, device_id, channel_id, source, col1, col2
现在分区在 3 列 org_id device_id channel_id
我想把分区改成source, org_id, device_id, channel_id.
我正在使用 pyspark 从 s3 读取文件并写入 s3 存储桶。
sc = SparkContext(appName="parquet_ingestion1").getOrCreate()
spark = SparkSession(sc)
file_path = "s3://some-bucket/some_folder"
print("Reading parquet from s3:".format(file_path))
spark_df = spark.read.parquet(file_path)
print("Converting to parquet")
file_path_re = "s3://other_bucket/re-partition"
partition_columns = ["source", "org_id", "device_id", "channel_id "]
spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
我收到错误,没有生成 parquet 文件。
spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
[Stage 1:> (0 + 8) / 224]20/04/29 13:29:44 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, ip-172-31-43-0.ap-south-1.compute.internal, executor 3): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
然后我尝试了
spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
[Stage 3:> (0 + 8) / 224]20/04/29 13:32:11 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 23, ip-172-31-42-4.ap-south-1.compute.internal, executor 5): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Stage 3:==> (8 + 8) / 224]20/04/29 13:32:22 WARN TaskSetManager: Lost task 0.2 in stage 3.0 (TID 40, ip-172-31-42-4.ap-south-1.compute.internal, executor 5): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
在第二种情况下,它失败了,但它也在创建镶木地板。现在我不确定它是否正确地将所有数据创建到新分区。 让我知道重新划分镶木地板的正确方法是什么。
更新 1:
from pyspark.sql.types import StringType
for col1 in partition_columns:
spark_df=spark_df.withColumn(col1, col(col1).cast(dataType=StringType()))
都试过了 spark_df.repartition(1).write.partitionBy(partition_columns).mode('append').parquet(file_path_re)
spark_df.write.partitionBy(partition_columns).mode('append').parquet(file_path_re) 我收到以下错误
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 20, ip-172-31-42-4.ap-south-1.compute.internal, executor 4): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainFloatDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToBinary(ParquetDictionary.java:51)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:380)
更新 2:
现在我发现其中一列存在架构不匹配,一列是字符串,另一列是浮点数。我已经描述了下面的场景。 在这里您可以看到 col1 列是一行中的字符串,另一行是浮动的
org_id, device_id, channel_id, source, col1, col2
"100" "device1" "channel" "source1" 10 0.1
"100" "device1" "channel" "source2" "10" 0.1
我尝试将 col1 列转换为 float.it dodn;t 工作
任何建议。
【问题讨论】:
您可以尝试强制读取模式到您的数据框,将所有列保留为字符串,然后运行 df.show(),因为其他列也可能出现错误? @ShubhamJain 更新了问题 【参考方案1】:尝试强制类型将所有partition_columns
转换为StringType
【讨论】:
【参考方案2】:UPDATE2 中提到了问题的根本原因。在我的情况下,我们有 4 个应用程序(基于源的不同管道的一部分)写入镶木地板商店。 2 应用程序 APP1 和 APP2 不使用 col1 和 APP 3 用于将其写为浮点数。 最近 APP4 开始在他们的数据中获取 col1 并将其作为字符串存储在 parquet.parquet 中不要在写作时抱怨。 在阅读这样的镶木地板时
-
我尝试投射它没有成功
合并架构因数据类型不匹配而失败
我尝试根据源类型过滤数据。从某种意义上说,如果过滤掉 APP4 数据,它会部分起作用。但如果过滤掉 APP3 数据,它就不起作用。
这可能不是一个好的解决方案,但我现在不得不满足于此。
解决方案: 1.过滤掉app4源数据,创建数据框并转换成parquet,然后只过滤数据框中的app4源parquet,去掉col1,转换成parquet。
-
或从整个数据帧中删除 col 并写入 parquet。
df1 =df.select([c for c in df.columns if c!= 'col1'])
【讨论】:
由于 parquet 包含数据类型信息,因此我们甚至无法在读取时明确将其全部设为字符串。如果可能的话,将两个源生成的镶木地板文件保存在两个不同的位置,一旦将它们作为两个数据框读取,您可以通过将列转换为一次读取来合并两个数据框,您可以轻松地进行转换。以上是关于使用 pyspark 重新分区失败并出现错误的主要内容,如果未能解决你的问题,请参考以下文章