如何使用 spark(scala)读取和写入(更新)同一个文件

Posted

技术标签:

【中文标题】如何使用 spark(scala)读取和写入(更新)同一个文件【英文标题】:how to read and write (update) the same file using spark (scala) 【发布时间】:2019-12-23 16:50:11 【问题描述】:

我想根据某些条件更新一个 CSV 文件,因为我读取了该文件,进行了所有需要的更新,但是当我尝试编写它时,我得到了一个 FileNotFoundException

我认为这是由于写入过程,因为当我访问路径(输入/输出文件所在的位置)时,我发现它是空的。

有没有更好的方法来更新文件?如果没有,我该如何解决FileNotFoundException 错误?

【问题讨论】:

写入另一个路径,删除旧路径并重命名/移动新路径? 检查点是 Raphael 所说的更好的主意。请检查我的答案以及在我的项目中有效的答案。如果您喜欢他们,请删除 -ve 投票,因为这不是错误的答案。 【参考方案1】:

您可以通过编写临时表/csv 或使用 checkpointing 来做到这一点:

这行得通:

sparkSession.sparkContext.setCheckpointDir("tmp")

ss.read.csv("test.csv") // read existing csv
  .withColumn("test",lit(1)) // modify
  .checkpoint(eager = true) // checkpoint, write to disk
  .write.mode("overwrite") 
  .csv("test.csv") // write to same location

【讨论】:

这将与下面描述的方式一样工作 - 写入新文件,删除,重命名。这就是我在项目中编码的方式。我的答案没有错。

以上是关于如何使用 spark(scala)读取和写入(更新)同一个文件的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark/Scala 中写入 HDFS,读取 zip 文件

由于 Databricks 不公开支持 spark-redshift lib,使用 Scala spark 从 Redshift 读取/写入 Redshift 的最佳方法是啥

在顶部/使用 Spark 保存和加载 JSON 和 scala 的对象

使用 scala 从 spark 中删除 bigquery 表

在 DataFrame 的每个分区内读取、转换和写入数据

在本地文件系统(不是HDFS)中使用Scala读取Apache Spark中的文件时如何更改输入块大小[重复]