Apache Spark 无法读取使用流式作业编写的 parquet 文件夹
Posted
技术标签:
【中文标题】Apache Spark 无法读取使用流式作业编写的 parquet 文件夹【英文标题】:Apache Spark can't read parquet folder that is being written with streaming job 【发布时间】:2019-07-27 08:23:47 【问题描述】:当我尝试使用选项“mergeSchema”:“true”读取当前正在使用另一个 spark 流作业写入的 parquet 文件夹时,我收到一个错误:
java.io.IOException: Could not read footer for file
val df = spark
.read
.option("mergeSchema", "true")
.parquet("path.parquet")
如果没有架构合并,我可以很好地读取文件夹,但是是否可以通过架构合并读取这样的文件夹,而不管可能的副业是否更新它?
完全例外:
java.io.IOException: Could not read footer for file: FileStatuspath=hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:551)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:538)
at org.apache.spark.util.ThreadUtils$$anonfun$3$$anonfun$apply$1.apply(ThreadUtils.scala:287)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet is not a Parquet file (too small length: 0)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
... 9 more
【问题讨论】:
你需要mergeSchema
做什么?
@JacekLaskowski 将 parquet 文件与不同的字段集结合起来,因为模式演变
“用另一个 spark 流作业编写” 中的流作业的输出格式是什么?你能包括整个例外吗?在您尝试读取批处理作业中的文件时,流作业是否已启动并正在运行?
刚刚使用示例流式查询和批处理查询检查了您的用例,一切正常。你如何开始流式查询?如何开始批处理作业?
看起来 Spark 在不合并模式的情况下会忽略不完整的文件,但启用此选项后,它会尝试使用不完整的文件以某种方式导致异常
【参考方案1】:
在创建数据框之前运行以下操作:
spark.sql("设置 spark.sql.files.ignoreCorruptFiles=true")
即启用此配置 - spark.sql.files.ignoreCorruptFiles
如here 所述,如果此配置为真,Spark 作业将在遇到损坏或不存在的文件时继续运行,并且仍然会返回已读取的内容。此外,merge schema flow 使用此配置。
可从 Spark 2.1.1+ 获得
【讨论】:
以上是关于Apache Spark 无法读取使用流式作业编写的 parquet 文件夹的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Spark Batch 对 Apache Kafka 进行偏移管理