并行使用 scala Spark 重命名 HDFS 文件时的序列化问题
Posted
技术标签:
【中文标题】并行使用 scala Spark 重命名 HDFS 文件时的序列化问题【英文标题】:Serialization issue while renaming HDFS File using scala Spark in parallel 【发布时间】:2018-10-05 15:26:07 【问题描述】:我想使用 spark 并行重命名 HDFS 文件。但是我得到了序列化异常,我在我的代码之后提到了这个异常。 我在使用 spark.sparkContext.parallelize 时遇到了这个问题。我还可以在循环中重命名所有文件。
def renameHdfsToS3(spark : SparkSession, hdfsFolder :String, outputFileName:String,
renameFunction: (String,String) => String, bktOutput:String, folderOutput:String, kmsKey:String): Boolean =
try
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new Path(hdfsFolder)
val files = fs.listStatus(path)
.filter(fs => fs.isFile)
val parallelRename=spark.sparkContext.parallelize(files).map(
f=>
parallelRenameHdfs(fs,outputFileName,renamePartFileWithTS,f)
)
val hdfsTopLevelPath=fs.getWorkingDirectory()+"/"+hdfsFolder
return true
catch
case NonFatal(e) =>
e.printStackTrace()
return false
下面是我得到的异常
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
Caused by: java.io.NotSerializableException: org.apache.hadoop.fs.LocalFileSystem
Serialization stack:
- object not serializable (class: org.apache.hadoop.fs.LocalFileSystem, value: org.apache.hadoop.fs.LocalFileSystem@1d96d872)
- field (class: at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
【问题讨论】:
不确定答案,但我认为这不是正确的方法。 见***.com/questions/45430588/… @thebluephantom 嗨,我已经检查了那个帖子,那里的文件被串行重命名..但我正在寻找一种可以并行重命名所有文件的方法。 看答案。注意 .par !!! 抱歉最近没有发布..谢谢! 【参考方案1】:该方法不正确,因为 sc.parallelize 用于通过 RDD 使用数据。您需要在操作系统级别工作。存在许多这样的帖子。
这样的东西应该足以将它与您自己的逻辑混合,注意允许并行处理,例如:
originalpath.par.foreach( e => hdfs.rename(e,e.suffix("finish")))
您需要检查.par 是如何定义并行性的。看这里https://docs.scala-lang.org/overviews/parallel-collections/configuration.html
【讨论】:
以上是关于并行使用 scala Spark 重命名 HDFS 文件时的序列化问题的主要内容,如果未能解决你的问题,请参考以下文章
在 Scala 中设计和并行化 Spark 应用程序的最佳方法 [关闭]
如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件
如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?