如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?

Posted

技术标签:

【中文标题】如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?【英文标题】:How to write/create zip files on HDFS using Spark/Scala? 【发布时间】:2021-08-30 13:54:30 【问题描述】:

我需要在 Apache Zeppelin 中编写一个 Spark/Scala 函数,它只是将一些已经存在于 HDFS 文件夹中的文件放入一个 zip 或 gzip 存档(或在 Windows 中易于提取的一些常见存档格式)中同一个文件夹。请问我该怎么做?会是 Java 调用吗?我看到有一个叫做 ZipOutputStream 的东西,这是正确的方法吗?任何提示表示赞赏。

谢谢

【问题讨论】:

【参考方案1】:

Spark 不支持直接从 zip 读取/写入,因此使用 ZipOutputStream 基本上是唯一的方法。

这是我用来通过 spark 压缩现有数据的代码。它递归地列出文件的目录,然后继续压缩它们。此代码不保留目录结构,但保留文件名。

输入目录:

unzipped/
├── part-00001
├── part-00002
└── part-00003

0 directories, 3 files

输出目录:

zipped/
├── part-00001.zip
├── part-00002.zip
└── part-00003.zip

0 directories, 3 files

ZipPacker.scala:

package com.haodemon.spark.compression

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem, Path
import org.apache.hadoop.io.IOUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf, SparkContext

import java.io.FileOutputStream
import java.util.zip.ZipEntry, ZipOutputStream


object ZipPacker extends Serializable 

  private def getSparkContext: SparkContext = 
    val conf: SparkConf = new SparkConf()
      .setAppName("local")
      .setMaster("local[*]")
    SparkSession.builder().config(conf).getOrCreate().sparkContext
  

  // recursively list files in a filesystem
  private def listFiles(fs: FileSystem, path: Path): List[Path] = 
    fs.listStatus(path).flatMap(p =>
        if (p.isDirectory) listFiles(fs, p.getPath)
        else List(p.getPath)
      ).toList
  

  // zip compress file one by one in parallel
  private def zip(inputPath: Path, outputDirectory: Path): Unit = 
    val outputPath = 
      val name = inputPath.getName + ".zip"
      outputDirectory + "/" + name
    
    println(s"Zipping to $outputPath")

    val zipStream = 
      val out = new FileOutputStream(outputPath)
      val zip = new ZipOutputStream(out)
      val entry = new ZipEntry(inputPath.getName)
      zip.putNextEntry(entry)
      // max compression
      zip.setLevel(9)
      zip
    

    val conf = new Configuration
    val uncompressedStream = inputPath.getFileSystem(conf).open(inputPath)
    val close = true
    IOUtils.copyBytes(uncompressedStream, zipStream, conf, close)
  

  def main(args: Array[String]): Unit = 
    val input = new Path(args(0))
    println(s"Using input path $input")

    val sc = getSparkContext
    val uncompressedFiles = 
      val conf = sc.hadoopConfiguration
      val fs = input.getFileSystem(conf)
      listFiles(fs, input)
    
    val rdd = sc.parallelize(uncompressedFiles)

    val output = new Path(args(1))
    println(s"Using output path $output")

    rdd.foreach(unzipped => zip(unzipped, output))
  


【讨论】:

以上是关于如何使用 Spark/Scala 在 HDFS 上编写/创建 zip 文件?的主要内容,如果未能解决你的问题,请参考以下文章

使用 spark/scala 从 HDFS 目录中获取所有 csv 文件名

在 Spark/Scala 中写入 HDFS,读取 zip 文件

从 HDFS 加载数据 -Spark Scala [重复]

如何在 Spark/Scala 中使用 countDistinct?

如何在倾斜列上重新分区 Spark scala 中的数据框?

Spark Scala:使用 $ 的符号的功能差异?