使用 spark 读取压缩文件*带有自定义扩展名*

Posted

技术标签:

【中文标题】使用 spark 读取压缩文件*带有自定义扩展名*【英文标题】:Read a compressed file *with custom extension* with spark 【发布时间】:2017-06-05 16:04:03 【问题描述】:

我想使用 sc.textFile("path/to/file.Z") 的等效项将 gzip 压缩文件读入 RDD[String]

如果不是gz,而是Z,则我的文件扩展名除外,因此文件不会被识别为gzip。

我无法重命名它们,因为它会破坏生产代码。我不想复制它们,因为它们很大而且很多。我想我可以使用某种符号链接,但我想先看看 scala/spark 是否有办法(我现在在本地 Windows 机器上)。

我怎样才能有效地阅读这个文件?

【问题讨论】:

相关:***.com/q/39560841/877069 还有一个相关的:有一个issue on the Spark tracker 在读取文件时添加了一种明确指定压缩编解码器的方法,因此 Spark 不会从文件扩展名中推断出来。 【参考方案1】:

这里有解决此问题的解决方法http://arjon.es/2015/10/02/reading-compressed-data-with-spark-using-unknown-file-extensions/

相关部分:

...扩展 GzipCodec 并覆盖 getDefaultExtension 方法。

package smx.ananke.spark.util.codecs

import org.apache.hadoop.io.compress.GzipCodec

class TmpGzipCodec extends GzipCodec 

  override def getDefaultExtension(): String = ".gz.tmp" // You should change it to ".Z"


现在我们刚刚注册了这个编解码器,设置 SparkConf 上的 spark.hadoop.io.compression.codecs:

val conf = new SparkConf()

// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.io.compression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")

val sc = new SparkContext(conf)

val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")

【讨论】:

如果有人需要在托管服务中作为 Databricks 执行此操作,我已经创建了项目以在此处生成 lib github.com/antonmry/spark-gzip-custom-extension

以上是关于使用 spark 读取压缩文件*带有自定义扩展名*的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 spark DF 或 DS 读取“.gz”压缩文件?

spark.read.excel - 使用自定义架构时不读取所有 Excel 行

用scala在spark中读取压缩文件

我可以使用 Spark 进行自定义计算吗?

在 Spark 2.3.0 中读取 Zstandard 压缩文件

Spark 2.1.0:读取压缩的 csv 文件