如何在 spark scala 中重命名 S3 文件而不是 HDFS

Posted

技术标签:

【中文标题】如何在 spark scala 中重命名 S3 文件而不是 HDFS【英文标题】:How rename S3 files not HDFS in spark scala 【发布时间】:2018-06-20 08:49:46 【问题描述】:

我在 S3 中存储了大约 100 万个文本文件。 我想根据文件夹名称重命名所有文件。

如何在 spark-scala 中做到这一点?

我正在寻找一些示例代码。

我正在使用 zeppelin 运行我的 spark 脚本。

我按照答案的建议尝试了以下代码

import org.apache.hadoop.fs._

val src = new Path("s3://trfsmallfffile/FinancialLineItem/MAIN")
val dest = new Path("s3://trfsmallfffile/FinancialLineItem/MAIN/dest")
val conf = sc.hadoopConfiguration   // assuming sc = spark context
val fs = Path.getFileSystem(conf)
fs.rename(src, dest)

但是遇到错误

<console>:110: error: value getFileSystem is not a member of object org.apache.hadoop.fs.Path
       val fs = Path.getFileSystem(conf)

【问题讨论】:

【参考方案1】:

您可以使用普通的 HDFS API,例如(输入,未测试)

val src = new Path("s3a://bucket/data/src")
val dest = new Path("s3a://bucket/data/dest")
val conf = sc.hadoopConfiguration   // assuming sc = spark context
val fs = src.getFileSystem(conf)
fs.rename(src, dest)

S3A 客户端伪造重命名的方式是每个文件的copy + delete,因此它所花费的时间与文件数和数据量成正比。 S3 会限制您:如果您尝试并行执行此操作,它可能会减慢您的速度。如果需要“一段时间”,请不要感到惊讶。

您还需要为每次 COPY 通话付费,每 1,000 次通话收取 0.005 美元,因此尝试费用约为 5 美元。在一个小目录上进行测试,直到您确定一切正常

【讨论】:

刚刚尝试过,但更新了我的问题时出现错误,请看一次 好的,在我的代码中发现错误。也就是说,如果您在这个级别上认真工作,您将需要 IDE 中的全部 Hadoop 和 Spark 源代码树。请注意,尽早开始练习 已经为此要求筹集了两个赏金..我的同事也提出了一个活跃的赏金。***.com/questions/46703623/… ... 我已经修复了我的代码,你应该使用'src.getFileSystem()`;它是一种非抽象方法。正如我警告的那样,没有输入,没有测试。 是的,我对此投了赞成票..非常感谢..但还有一件事,我在 src 文件夹中有很多文件,我想重命名并将其移动到其他文件夹。 ..我的cooleague为此创建了单独的问题..你能看看那个问题吗......我们也有100点赏金..如果你能提供帮助,那就太好了

以上是关于如何在 spark scala 中重命名 S3 文件而不是 HDFS的主要内容,如果未能解决你的问题,请参考以下文章

试图了解如何在 Go 中重命名字节数组

并行使用 scala Spark 重命名 HDFS 文件时的序列化问题

在 spark python 中重命名数据框列

Spark Scala S3 存储:权限被拒绝

如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件

通过spark加载现有的s3文件在scala中给出403,但在python中没有