spark sql 无法在 S3 中查询镶木地板分区

Posted

技术标签:

【中文标题】spark sql 无法在 S3 中查询镶木地板分区【英文标题】:spark sql could not query parquet partitions in S3 【发布时间】:2020-04-20 20:18:38 【问题描述】:

我在 AWS s3 中有 100 个 parquet_dir/*.snappy.parquet 文件作为分区。文件大小为 6GB。我无法查询这些分区文件。读取用 HDFS 写入的相同分区文件时,相同的查询成功。请建议我如何处理这个问题。

val DF = spark.read.parquet("s3a:/parquet_dir").cache() 
DF.registerTempTable("DF1")
val query1=sqlContext.sql("select * from DF1").show

错误信息

(0 + 24) / 25]2020-04-21 01:08:41,352 WARN storage.BlockManager: Putting block rdd_7_4 failed due to exception java.io.InterruptedIOException: Failed to open s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet at 4 on s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool.
2020-04-21 01:08:41,353 WARN storage.BlockManager: Block rdd_7_4 could not be removed as it was not found on disk or in memory
2020-04-21 01:08:41,359 ERROR executor.Executor: Exception in task 4.0 in stage 2.0 (TID 128)
java.io.InterruptedIOException: Failed to open s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet at 4 on s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:182)
    at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:328)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
    at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:321)
    at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:433)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:102)
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
    at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:125)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

【参考方案1】:

点击here! 尝试合并读取分区文件的数据帧并且它有效。

【讨论】:

【参考方案2】:

您已达到线程池的限制,并且其他一些操作花费的时间太长以至于等待线程超时。

将选项 spark.hadoop.fs.s3a.connection.maximum 设置为大于默认值 (48)。

【讨论】:

以上是关于spark sql 无法在 S3 中查询镶木地板分区的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 将镶木地板文件(在 aws s3 中)存储到 spark 数据框中

EMR 5.28 无法在 s3 上加载镶木地板文件

将大型 Spark 数据帧作为镶木地板写入 s3 存储桶

无法使用 Pyspark 2.4.4 读取 s3 存储桶中的镶木地板文件

presto 是不是需要配置单元元存储才能从 S3 读取镶木地板文件?

在镶木地板的地图类型列上使用 spark-sql 过滤下推