Spark SQL SaveMode.Overwrite,获取 java.io.FileNotFoundException 并需要'REFRESH TABLE tableName'

Posted

技术标签:

【中文标题】Spark SQL SaveMode.Overwrite,获取 java.io.FileNotFoundException 并需要\'REFRESH TABLE tableName\'【英文标题】:Spark SQL SaveMode.Overwrite, getting java.io.FileNotFoundException and requiring 'REFRESH TABLE tableName'Spark SQL SaveMode.Overwrite,获取 java.io.FileNotFoundException 并需要'REFRESH TABLE tableName' 【发布时间】:2017-03-21 07:28:11 【问题描述】:

对于spark sql,我们应该如何从HDFS中的一个文件夹中获取数据,进行一些修改,并将更新的数据保存到HDFS中的同一文件夹中通过覆盖保存模式而不得到FileNotFoundException

import org.apache.spark.sql.SparkSession,SaveMode
import org.apache.spark.SparkConf

val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")

newDF.write.mode(SaveMode.Overwrite)
     .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
     .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works

FileNotFoundException 发生在我们从 hdfs 目录 "d=2017-03-20" 读取数据并将更新的数据保存 (SaveMode.Overwrite) 到同一个 hdfs 目录 "d=2017-03-20" 时

Caused by: org.apache.spark.SparkException: Task failed while writing rows
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
  ... 8 more

以下尝试仍然出现相同的错误,我应该如何使用 spark sql 解决这个问题?谢谢!

val hdfsDirPath = "hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20"

val df = sparkSession.read.parquet(hdfsDirPath)

val newdf = df
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)

// or

val df = sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")

sparkSession.sql("TRUNCATE TABLE orgtable")
sparkSession.sql("INSERT INTO orgtable SELECT * FROM tmptable")

val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)

// or

val df = sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")

sparkSession.sql("REFRESH TABLE orgtable")
sparkSession.sql("ALTER VIEW tmptable RENAME TO orgtable")
    
val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)

【问题讨论】:

我觉得不行,需要先保存到其他目录再复制到原目录 我们无法将 Spark Dataframe 写入我们正在读取它的文件。如果您仍想这样做,请先将 DF 写入临时目录,然后将其写入带有 SaveMode.Overwrite 的目录。 谢谢 Akash 和 himanshuIIITian,在 HDFS 上的临时目录中编写 DF 是一个很好的解决方案,但我想知道是否有一种方法可以使用 Spark SQL 来解决这个问题?由于从 HDFS 写入和获取数据的时间和空间效率低于仅在内存中使用 Spark SQL。我们可以使用 REFRESH、TRUNCATE 或 DROP 表来解决问题吗? 我有同样的问题,即使我正在将 df 写入 java 中的临时表 @faustineinsun 你是怎么解决这个问题的?除了临时目录,你有没有找到其他方法? 【参考方案1】:

我解决了这个问题,首先我将我的 Dataframe 写入一个临时目录,并删除我阅读的源,并将临时目录重命名为源名称。问答

【讨论】:

谢谢!我想知道我们是否可以使用 REFRESH、TRUNCATE 或 DROP 表来解决问题?在内存中做操作比在磁盘上做同样的事情效率更高 这是唯一有效的答案。另一个答案是错误的。起初我自己对另一个答案投了赞成票,但无法收回我的赞成票,它说在我编辑问题之前它已被锁定。【参考方案2】:

为什么不读完就缓存一下。将其保存到另一个文件目录然后移动该目录可能需要一些额外的权限。我也一直在强制执行操作,例如 show()。

val myDF = spark.read.format("csv")
    .option("header", "false")
    .option("delimiter", ",")
    .load("/directory/tofile/")


myDF.cache()
myDF.show(2)

【讨论】:

cache()persist(StorageLevel.MEMORY_ONLY) 的别名,这对于大于集群内存的数据集可能不好。 persist(StorageLevel.MEMORY_AND_DISK_ONLY) 会是一个更好的解决方案,但是如果内存不够,它会将数据保存到本地。我之前用过persist,但是好像不行 你尝试在缓存之后调用一个动作吗? “覆盖”仍然是您的第一个操作吗? 这对我不起作用。廖的回答(保存到临时目录)确实有效 你需要缓存,然后使用一个动作(如 show)强制 spark 在做任何其他事情之前缓存数据。在这种情况下,它应该可以工作。使用 Spark 2.4.x 测试 @sdikby 我尝试了同样的方法,使用缓存和显示,但是如果我对数据集进行计数,它会抛出错误。【参考方案3】:

我遇到了类似的问题。我正在使用以下代码将数据框写入配置单元表

dataframe.write.mode("overwrite").saveAsTable("mydatabase.tablename")   

当我尝试查询此表时,我遇到了同样的错误。然后我在创建表后添加了以下代码行以刷新表,从而解决了问题。

spark.catalog.refreshTable("mydatabase.tablename")

【讨论】:

spark.sql("refresh mydatabase.tablename") 对我不起作用,但您的解决方案解决了这个问题。谢谢【参考方案4】:
val dfOut = df.filter(r => r.getAs[Long]("dsctimestamp") > (System.currentTimeMillis() - 1800000))

在上面的代码行中,df 有一个底层 Hadoop 分区。完成此转换后(即转换为 dfOut),直到 dfOut 被垃圾回收后,我才能找到删除、重命名或覆盖底层分区的方法。

我的解决方案是保留旧分区,为dfOut 创建一个新分区,将新分区标记为当前分区,然后在dfOut 被垃圾回收后的某个特定时间删除旧分区。

可能不是一个理想的解决方案。我很想学习一种不那么曲折的方法来解决这个问题。但它有效。

【讨论】:

以上是关于Spark SQL SaveMode.Overwrite,获取 java.io.FileNotFoundException 并需要'REFRESH TABLE tableName'的主要内容,如果未能解决你的问题,请参考以下文章

学习笔记Spark—— Spark SQL应用—— Spark SQL简介环境配置

Spark SQL - org.apache.spark.sql.AnalysisException

Spark SQL - 在 Spark Streams 上部署 SQL 查询的选项

spark -SQL 配置参数

spark sql 小样

Spark SQL(十):Hive On Spark