EMR 5.28 无法在 s3 上加载镶木地板文件

Posted

技术标签:

【中文标题】EMR 5.28 无法在 s3 上加载镶木地板文件【英文标题】:EMR 5.28 not able to load parquet files on s3 【发布时间】:2020-04-01 20:07:04 【问题描述】:

在 EMR 集群 5.28.0 上,从 s3 读取 parquet 文件失败并出现以下异常,而在 EMR 5.18.0 上同样可以正常工作。 下面是 EMR 5.28.0 上的堆栈跟踪。

我什至从spark-shell尝试过:

sqlContext.read.load(("s3://s3_file_path/*")
df.take(5) 

但同样的异常失败:

Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 17, ip-x.x.x.x.ec2.internal, executor 1): **org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to download file path: s3://somedir/somesubdir/434560/1658_1564419581.parquet, range: 0-7928, partition values: [empty row], isDataPresent: false**
    at org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:241)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:171)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
**Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.org$apache$spark$sql$execution$datasources$parquet$ParquetFileFormat$$isCreatedByParquetMr(ParquetFileFormat.scala:352)
    at** org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildPrefetcherWithPartitionValues$1.apply(ParquetFileFormat.scala:676)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildPrefetcherWithPartitionValues$1.apply(ParquetFileFormat.scala:579)
    at org.apache.spark.sql.execution.datasources.AsyncFileDownloader.org$apache$spark$sql$execution$datasources$AsyncFileDownloader$$downloadFile(AsyncFileDownloader.scala:93)
    at org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anonfun$initiateFilesDownload$2$$anon$1.call(AsyncFileDownloader.scala:73)
    at org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anonfun$initiateFilesDownload$2$$anon$1.call(AsyncFileDownloader.scala:72)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more

我无法找到此文档。有没有人在 EMR 5.28.0 上遇到过这个问题并且能够解决这个问题?

在 5.28 上,我能够读取 EMR 写入 s3 的文件,但读取 parquet-go 写入的现有 parquet 文件会引发上述异常,而它在 EMR 5.18 上工作正常

更新: 在检查 parquet 文件时,仅适用于 5.18 的旧文件缺少统计信息

creator:            null 
file schema:        parquet-go-root 
timestringhr:        BINARY SNAPPY DO:0 FPO:21015 SZ:1949/25676/13.17 VC:1092 ENC:RLE,BIT_PACKED,PLAIN ST:[no stats for this column]
timeseconds:         INT64 SNAPPY DO:0 FPO:22964 SZ:1397/9064/6.49 VC:1092 ENC:RLE,BIT_PACKED,PLAIN ST:[min: 1564419460, max: 1564419581, num_nulls not defined]

那些同时在 EMR 5.18 和 5.28 上工作的人像

creator:            parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) 
extra:              org.apache.spark.sql.parquet.row.metadata = <schema_here>    
timestringhr:        BINARY SNAPPY DO:0 FPO:3988 SZ:156/152/0.97 VC:1092 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED ST:[min: 2019-07-29 16:00:00, max: 2019-07-29 16:00:00, num_nulls: 0]
timeseconds:         INT64 SNAPPY DO:0 FPO:4144 SZ:954/1424/1.49 VC:1092 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED ST:[min: 1564419460, max: 1564419581, num_nulls: 0]

这可能会导致 NullPointerException 。发现与 parquet-mr 相关的问题https://issues.apache.org/jira/browse/PARQUET-1217。我可以尝试在类路径中包含 parquet 的更新版本,或者在 EMR 6 beta 上进行测试,看看是否能解决问题。

【问题讨论】:

我在从 EMR-5.20.0 升级到 EMR-5.29.0 后遇到了类似的问题 【参考方案1】:

尝试将created_by 值添加到页脚。我将一个 NPE 追溯到 Spark 中的 footer/created_by 检查。如果您使用的是xitongsys/parquet-go,请考虑:

var writer_version = "parquet-go version 1.0"
...

...
pw, err := writer.NewJSONWriter(schemaStr, fw, 4)
pw.Footer.CreatedBy = &writer_version

【讨论】:

我只想评论一下,通过在 Go 代码中添加 pw.Footer.CreatedBy 是有效的【参考方案2】:

确保您的 parquet 文件不包含行数为零的行组。您可能必须在文件加载时使用阅读器对其进行调试。我们在 AWS Glue 中遇到了“非法行组 0 行”。

修复: 我们使用 Parquet.net nuget 包,如果行组不包含数据,则限制它写入行组。

【讨论】:

以上是关于EMR 5.28 无法在 s3 上加载镶木地板文件的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 Pyspark 2.4.4 读取 s3 存储桶中的镶木地板文件

spark sql 无法在 S3 中查询镶木地板分区

将镶木地板从 AWS Kinesis firehose 写入 AWS S3

如何在 python 的 S3 中从 Pandas 数据帧写入镶木地板文件

在python中使用s3 select解析多个镶木地板文件?

如何将镶木地板文件从 s3 导入到 postgresql rds