并行使用 scala Spark 重命名 HDFS 文件时的序列化问题

Posted

技术标签:

【中文标题】并行使用 scala Spark 重命名 HDFS 文件时的序列化问题【英文标题】:Serialization issue while renaming HDFS File using scala Spark in parallel 【发布时间】:2018-10-05 15:26:07 【问题描述】:

我想使用 spark 并行重命名 HDFS 文件。但是我得到了序列化异常,我在我的代码之后提到了这个异常。 我在使用 spark.sparkContext.parallelize 时遇到了这个问题。我还可以在循环中重命名所有文件。

  def renameHdfsToS3(spark : SparkSession, hdfsFolder :String, outputFileName:String,
                     renameFunction: (String,String) => String, bktOutput:String, folderOutput:String, kmsKey:String): Boolean = 
    try 
      val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
      val path = new Path(hdfsFolder)
      val files = fs.listStatus(path)
        .filter(fs => fs.isFile)

      val parallelRename=spark.sparkContext.parallelize(files).map(
        f=>
          parallelRenameHdfs(fs,outputFileName,renamePartFileWithTS,f)
        
      )
      val hdfsTopLevelPath=fs.getWorkingDirectory()+"/"+hdfsFolder
      return true
     catch 
      case NonFatal(e) => 
        e.printStackTrace()
        return false
      
    
  

下面是我得到的异常

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)

Caused by: java.io.NotSerializableException: org.apache.hadoop.fs.LocalFileSystem
Serialization stack:
    - object not serializable (class: org.apache.hadoop.fs.LocalFileSystem, value: org.apache.hadoop.fs.LocalFileSystem@1d96d872)
    - field (class:         at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

【问题讨论】:

不确定答案,但我认为这不是正确的方法。 见***.com/questions/45430588/… @thebluephantom 嗨,我已经检查了那个帖子,那里的文件被串行重命名..但我正在寻找一种可以并行重命名所有文件的方法。 看答案。注意 .par !!! 抱歉最近没有发布..谢谢! 【参考方案1】:

该方法不正确,因为 sc.parallelize 用于通过 RDD 使用数据。您需要在操作系统级别工作。存在许多这样的帖子。

这样的东西应该足以将它与您自己的逻辑混合,注意允许并行处理,例如:

originalpath.par.foreach( e => hdfs.rename(e,e.suffix("finish")))

您需要检查.par 是如何定义并行性的。看这里https://docs.scala-lang.org/overviews/parallel-collections/configuration.html

【讨论】:

以上是关于并行使用 scala Spark 重命名 HDFS 文件时的序列化问题的主要内容,如果未能解决你的问题,请参考以下文章

在 Scala 中设计和并行化 Spark 应用程序的最佳方法 [关闭]

HDFS中的Pyspark重命名文件

如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件

如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?

使用 spark/scala 从 HDFS 目录中获取所有 csv 文件名

Hadoop 之 Spark 安装配置与示例