Apache Spark 无法读取使用流式作业编写的 parquet 文件夹

Posted

技术标签:

【中文标题】Apache Spark 无法读取使用流式作业编写的 parquet 文件夹【英文标题】:Apache Spark can't read parquet folder that is being written with streaming job 【发布时间】:2019-07-27 08:23:47 【问题描述】:

当我尝试使用选项“mergeSchema”:“true”读取当前正在使用另一个 spark 流作业写入的 parquet 文件夹时,我收到一个错误:

java.io.IOException: Could not read footer for file
val df = spark
    .read
    .option("mergeSchema", "true")
    .parquet("path.parquet")

如果没有架构合并,我可以很好地读取文件夹,但是是否可以通过架构合并读取这样的文件夹,而不管可能的副业是否更新它?

完全例外:

java.io.IOException: Could not read footer for file: FileStatuspath=hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:551)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:538)
    at org.apache.spark.util.ThreadUtils$$anonfun$3$$anonfun$apply$1.apply(ThreadUtils.scala:287)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: hdfs://path.parquet/part-00000-20199ef6-4ff8-4ee0-93cc-79d47d2da37d-c000.snappy.parquet is not a Parquet file (too small length: 0)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
    at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:476)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
    ... 9 more

【问题讨论】:

你需要mergeSchema做什么? @JacekLaskowski 将 parquet 文件与不同的字段集结合起来,因为模式演变 “用另一个 spark 流作业编写” 中的流作业的输出格式是什么?你能包括整个例外吗?在您尝试读取批处理作业中的文件时,流作业是否已启动并正在运行? 刚刚使用示例流式查询和批处理查询检查了您的用例,一切正常。你如何开始流式查询?如何开始批处理作业? 看起来 Spark 在不合并模式的情况下会忽略不完整的文件,但启用此选项后,它会尝试使用不完整的文件以某种方式导致异常 【参考方案1】:

在创建数据框之前运行以下操作:

spark.sql("设置 spark.sql.files.ignoreCorruptFiles=true")

即启用此配置 - spark.sql.files.ignoreCorruptFiles

如here 所述,如果此配置为真,Spark 作业将在遇到损坏或不存在的文件时继续运行,并且仍然会返回已读取的内容。此外,merge schema flow 使用此配置。

可从 Spark 2.1.1+ 获得

【讨论】:

以上是关于Apache Spark 无法读取使用流式作业编写的 parquet 文件夹的主要内容,如果未能解决你的问题,请参考以下文章

Spark流式传输作业不会删除随机播放文件

Spark 流式传输作业在被驱动程序停止后失败

无法为大型数据集运行 Spark 作业

使用 Apache Spark Ba​​tch 对 Apache Kafka 进行偏移管理

如何确保我的 Apache Spark 设置代码只运行一次?

Spark结构化流 - 使用模式从文件中读取时间戳