在 Spark/Scala 中写入 HDFS,读取 zip 文件

Posted

技术标签:

【中文标题】在 Spark/Scala 中写入 HDFS,读取 zip 文件【英文标题】:Writing to HDFS in Spark/Scala reading the zip files 【发布时间】:2017-02-17 10:01:23 【问题描述】:

我正在编写一个 spark/scala 程序来读取 ZIP 文件,解压缩它们并将内容写入一组新文件。我可以让它用于写入本地文件系统,但想知道是否有办法将输出文件写入分布式文件系统,如 HDFS。代码如下所示`

import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._

var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) =>    
   val zipStream = new ZipInputStream(file._2.open)            
   val entry = zipStream.getNextEntry                            
   val iter = scala.io.Source.fromInputStream(zipStream).getLines          
   val fname = f"/d/tmp/myfile$i.txt" 

   i = i + 1

   val xx = iter.mkString
   val writer = new PrintWriter(new File(fname))
   writer.write(xx)
   writer.close()

   iter                                                       
).collect()

`

【问题讨论】:

【参考方案1】:

您可以使用 hadoop-common 库轻松将数据写入 HDFS(如果您使用 sbt 作为依赖项管理工具,请将该库添加到您的依赖项中)。有了它,您可以创建一个 FileSystem 对象:

 private val fs = 
    val conf = new Configuration()
    FileSystem.get(conf)
  

请务必使用您的 hadoop 集群信息(core-site.xml 等)配置文件系统

然后你可以在 HDFS 上编写,例如一个字符串到路径(在你的情况下你应该处理流),如下所示:

@throws[IOException]
  def writeAsString(hdfsPath: String, content: String) 
    val path: Path = new Path(hdfsPath)
    if (fs.exists(path)) 
      fs.delete(path, true)
    
    val dataOutputStream: FSDataOutputStream = fs.create(path)
    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))
    bw.write(content)
    bw.close
  

【讨论】:

这个文件系统属于什么? java.nio 好像不是。 它是Hadoop文件系统,所以你需要把它带到你的依赖中【参考方案2】:
sc.binaryFiles("/user/example/zip_dir", 10)                   //make an RDD from *.zip files in HDFS
    .flatMap((file: (String, PortableDataStream)) =>         //flatmap to unzip each file
        val zipStream = new ZipInputStream(file._2.open)      //open a java.util.zip.ZipInputStream
        val entry = zipStream.getNextEntry                    //get the first entry in the stream
        val iter = Source.fromInputStream(zipStream).getLines //place entry lines into an iterator
        iter.next                                             //pop off the iterator's first line
        iter                                                  //return the iterator
    )
    .saveAsTextFile("/user/example/quoteTable_csv/result.csv")

【讨论】:

【参考方案3】:

你应该看看官方文档中的 saveAsTextFile 方法:http://spark.apache.org/docs/latest/programming-guide.html

它将允许您保存到 HDFS:

iter.saveAsTextFile("hdfs://...")

【讨论】:

因为代码iter不是RDD,所以不能写。也许首先进行转换。 是的,我认为演员阵容会很好。 RDD 应该是通过 spark 操作的数据类型,以便在集群上获取分布式数据。 这就是问题的关键所在。我已经尝试了一切我能想到的将我的迭代中的数据获取到 RDD 以启用 saveasTextFile 的使用,但没有完成。如果有人解决了这个问题,请告诉我【参考方案4】:

你可以试试 saveAsTextFile 方法。

将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定目录中。 Spark 将对每个元素调用 toString 以将其转换为文件中的一行文本。

它将每个分区保存为不同的文件,您最终得到的分区数将与输入文件的数量相同,除非您重新分区或合并。

【讨论】:

请参阅我上面的 cmets 了解为什么使用 saveasTextFile 是一个问题 不能你可以写整个RDD而不是单独写每个文件。而不是收集使用 saveAsText 文件? 这会将每个解压缩的所有数据连接到一个文件中。那不是我想要的。我希望每个解压缩的文件都在自己的单独文件中

以上是关于在 Spark/Scala 中写入 HDFS,读取 zip 文件的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark Scala 作业中加载和写入属性文件?

如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?

使用 spark/scala 从 HDFS 目录中获取所有 csv 文件名

如何在 spark scala 中重命名 S3 文件而不是 HDFS

从 HDFS 加载数据 -Spark Scala [重复]

用scala在spark中读取压缩文件