如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?
Posted
技术标签:
【中文标题】如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?【英文标题】:How to write/create zip files on HDFS using Spark/Scala? 【发布时间】:2021-08-30 13:54:30 【问题描述】:我需要在 Apache Zeppelin 中编写一个 Spark/Scala 函数,它只是将一些已经存在于 HDFS 文件夹中的文件放入一个 zip 或 gzip 存档(或在 Windows 中易于提取的一些常见存档格式)中同一个文件夹。请问我该怎么做?会是 Java 调用吗?我看到有一个叫做 ZipOutputStream 的东西,这是正确的方法吗?任何提示表示赞赏。
谢谢
【问题讨论】:
【参考方案1】:Spark 不支持直接从 zip 读取/写入,因此使用 ZipOutputStream
基本上是唯一的方法。
这是我用来通过 spark 压缩现有数据的代码。它递归地列出文件的目录,然后继续压缩它们。此代码不保留目录结构,但保留文件名。
输入目录:
unzipped/
├── part-00001
├── part-00002
└── part-00003
0 directories, 3 files
输出目录:
zipped/
├── part-00001.zip
├── part-00002.zip
└── part-00003.zip
0 directories, 3 files
ZipPacker.scala:
package com.haodemon.spark.compression
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem, Path
import org.apache.hadoop.io.IOUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf, SparkContext
import java.io.FileOutputStream
import java.util.zip.ZipEntry, ZipOutputStream
object ZipPacker extends Serializable
private def getSparkContext: SparkContext =
val conf: SparkConf = new SparkConf()
.setAppName("local")
.setMaster("local[*]")
SparkSession.builder().config(conf).getOrCreate().sparkContext
// recursively list files in a filesystem
private def listFiles(fs: FileSystem, path: Path): List[Path] =
fs.listStatus(path).flatMap(p =>
if (p.isDirectory) listFiles(fs, p.getPath)
else List(p.getPath)
).toList
// zip compress file one by one in parallel
private def zip(inputPath: Path, outputDirectory: Path): Unit =
val outputPath =
val name = inputPath.getName + ".zip"
outputDirectory + "/" + name
println(s"Zipping to $outputPath")
val zipStream =
val out = new FileOutputStream(outputPath)
val zip = new ZipOutputStream(out)
val entry = new ZipEntry(inputPath.getName)
zip.putNextEntry(entry)
// max compression
zip.setLevel(9)
zip
val conf = new Configuration
val uncompressedStream = inputPath.getFileSystem(conf).open(inputPath)
val close = true
IOUtils.copyBytes(uncompressedStream, zipStream, conf, close)
def main(args: Array[String]): Unit =
val input = new Path(args(0))
println(s"Using input path $input")
val sc = getSparkContext
val uncompressedFiles =
val conf = sc.hadoopConfiguration
val fs = input.getFileSystem(conf)
listFiles(fs, input)
val rdd = sc.parallelize(uncompressedFiles)
val output = new Path(args(1))
println(s"Using output path $output")
rdd.foreach(unzipped => zip(unzipped, output))
【讨论】:
以上是关于如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?的主要内容,如果未能解决你的问题,请参考以下文章
使用 spark/scala 从 HDFS 目录中获取所有 csv 文件名
在 Spark/Scala 中写入 HDFS,读取 zip 文件
如何在 Spark/Scala 中使用 countDistinct?