在 Spark/Scala 中写入 HDFS,读取 zip 文件
Posted
技术标签:
【中文标题】在 Spark/Scala 中写入 HDFS,读取 zip 文件【英文标题】:Writing to HDFS in Spark/Scala reading the zip files 【发布时间】:2017-02-17 10:01:23 【问题描述】:我正在编写一个 spark/scala 程序来读取 ZIP 文件,解压缩它们并将内容写入一组新文件。我可以让它用于写入本地文件系统,但想知道是否有办法将输出文件写入分布式文件系统,如 HDFS。代码如下所示`
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._
var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) =>
val zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
val iter = scala.io.Source.fromInputStream(zipStream).getLines
val fname = f"/d/tmp/myfile$i.txt"
i = i + 1
val xx = iter.mkString
val writer = new PrintWriter(new File(fname))
writer.write(xx)
writer.close()
iter
).collect()
`
【问题讨论】:
【参考方案1】:您可以使用 hadoop-common 库轻松将数据写入 HDFS(如果您使用 sbt 作为依赖项管理工具,请将该库添加到您的依赖项中)。有了它,您可以创建一个 FileSystem 对象:
private val fs =
val conf = new Configuration()
FileSystem.get(conf)
请务必使用您的 hadoop 集群信息(core-site.xml 等)配置文件系统
然后你可以在 HDFS 上编写,例如一个字符串到路径(在你的情况下你应该处理流),如下所示:
@throws[IOException]
def writeAsString(hdfsPath: String, content: String)
val path: Path = new Path(hdfsPath)
if (fs.exists(path))
fs.delete(path, true)
val dataOutputStream: FSDataOutputStream = fs.create(path)
val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))
bw.write(content)
bw.close
【讨论】:
这个文件系统属于什么? java.nio 好像不是。 它是Hadoop文件系统,所以你需要把它带到你的依赖中【参考方案2】:sc.binaryFiles("/user/example/zip_dir", 10) //make an RDD from *.zip files in HDFS
.flatMap((file: (String, PortableDataStream)) => //flatmap to unzip each file
val zipStream = new ZipInputStream(file._2.open) //open a java.util.zip.ZipInputStream
val entry = zipStream.getNextEntry //get the first entry in the stream
val iter = Source.fromInputStream(zipStream).getLines //place entry lines into an iterator
iter.next //pop off the iterator's first line
iter //return the iterator
)
.saveAsTextFile("/user/example/quoteTable_csv/result.csv")
【讨论】:
【参考方案3】:你应该看看官方文档中的 saveAsTextFile 方法:http://spark.apache.org/docs/latest/programming-guide.html
它将允许您保存到 HDFS:
iter.saveAsTextFile("hdfs://...")
【讨论】:
因为代码iter不是RDD,所以不能写。也许首先进行转换。 是的,我认为演员阵容会很好。 RDD 应该是通过 spark 操作的数据类型,以便在集群上获取分布式数据。 这就是问题的关键所在。我已经尝试了一切我能想到的将我的迭代中的数据获取到 RDD 以启用 saveasTextFile 的使用,但没有完成。如果有人解决了这个问题,请告诉我【参考方案4】:你可以试试 saveAsTextFile 方法。
将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定目录中。 Spark 将对每个元素调用 toString 以将其转换为文件中的一行文本。
它将每个分区保存为不同的文件,您最终得到的分区数将与输入文件的数量相同,除非您重新分区或合并。
【讨论】:
请参阅我上面的 cmets 了解为什么使用 saveasTextFile 是一个问题 不能你可以写整个RDD而不是单独写每个文件。而不是收集使用 saveAsText 文件? 这会将每个解压缩的所有数据连接到一个文件中。那不是我想要的。我希望每个解压缩的文件都在自己的单独文件中以上是关于在 Spark/Scala 中写入 HDFS,读取 zip 文件的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?
使用 spark/scala 从 HDFS 目录中获取所有 csv 文件名