spark sql 无法在 S3 中查询镶木地板分区
Posted
技术标签:
【中文标题】spark sql 无法在 S3 中查询镶木地板分区【英文标题】:spark sql could not query parquet partitions in S3 【发布时间】:2020-04-20 20:18:38 【问题描述】:我在 AWS s3 中有 100 个 parquet_dir/*.snappy.parquet 文件作为分区。文件大小为 6GB。我无法查询这些分区文件。读取用 HDFS 写入的相同分区文件时,相同的查询成功。请建议我如何处理这个问题。
val DF = spark.read.parquet("s3a:/parquet_dir").cache()
DF.registerTempTable("DF1")
val query1=sqlContext.sql("select * from DF1").show
错误信息
(0 + 24) / 25]2020-04-21 01:08:41,352 WARN storage.BlockManager: Putting block rdd_7_4 failed due to exception java.io.InterruptedIOException: Failed to open s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet at 4 on s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool.
2020-04-21 01:08:41,353 WARN storage.BlockManager: Block rdd_7_4 could not be removed as it was not found on disk or in memory
2020-04-21 01:08:41,359 ERROR executor.Executor: Exception in task 4.0 in stage 2.0 (TID 128)
java.io.InterruptedIOException: Failed to open s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet at 4 on s3a://parquet_dir/part-00077-3c1ec48b-611e-4f96-97ed-10f0fe371dd8-c000.snappy.parquet: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
at org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:182)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:328)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:321)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:433)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:102)
at org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174)
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:125)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
【问题讨论】:
【参考方案1】:点击here! 尝试合并读取分区文件的数据帧并且它有效。
【讨论】:
【参考方案2】:您已达到线程池的限制,并且其他一些操作花费的时间太长以至于等待线程超时。
将选项 spark.hadoop.fs.s3a.connection.maximum 设置为大于默认值 (48)。
【讨论】:
以上是关于spark sql 无法在 S3 中查询镶木地板分区的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyspark 将镶木地板文件(在 aws s3 中)存储到 spark 数据框中
无法使用 Pyspark 2.4.4 读取 s3 存储桶中的镶木地板文件