pyspark如何加载压缩的snappy文件

Posted

技术标签:

【中文标题】pyspark如何加载压缩的snappy文件【英文标题】:pyspark how to load compressed snappy file 【发布时间】:2015-04-25 21:59:18 【问题描述】:

我已经使用 python-snappy 压缩了一个文件并将它放在我的 hdfs 存储中。我现在正尝试像这样阅读它,但我得到了以下回溯。我找不到如何读取文件的示例,以便我可以处理它。我可以很好地阅读文本文件(未压缩)版本。我应该使用 sc.sequenceFile 吗?谢谢!

I first compressed the file and pushed it to hdfs

python-snappy -m snappy -c gene_regions.vcf gene_regions.vcf.snappy
hdfs dfs -put gene_regions.vcf.snappy /

I then added the following to spark-env.sh
export SPARK_EXECUTOR_MEMORY=16G                                                
export HADOOP_HOME=/usr/local/hadoop                                            

export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:$HADOOP_HOME/lib/native             
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native                 
export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native           
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_HOME/lib/lib/snappy-java-1.1.1.8-SNAPSHOT.jar

I then launch my spark master and slave and finally my ipython notebook where I am executing the code below.

a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()

ValueError Traceback(最近一次调用最后一次) 在 () ----> 1 a_file.first()

/home/user/Software/spark-1.3.0-bin-hadoop2.4/python/pyspark/rdd.pyc in first(self) 1244 如果 rs: 第1245章 -> 1246 引发 ValueError("RDD 为空") 1247 第1248章

ValueError: RDD 为空

Working code (uncompressed) text file
a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf")
a_file.first()

输出: u'##fileformat=VCFv4.1'

【问题讨论】:

请完善您的问题。另外,请提供更多相关代码(例如-如何保存文件) 【参考方案1】:

这里的问题是 python-snappy 与 Hadoop 的 snappy 编解码器不兼容,当 Spark 看到“.snappy”后缀时,它将使用它来读取数据。它们基于相同的底层算法,但它们不兼容,因为您可以使用一个压缩并使用另一个解压缩。

您可以通过首先使用 Spark 或 Hadoop 将数据写入 snappy 来完成这项工作。或者通过让 Spark 将您的数据读取为二进制 blob,然后您自己手动调用 python-snappy 解压缩(请参阅此处的 binaryFiles http://spark.apache.org/docs/latest/api/python/pyspark.html)。二进制 blob 方法有点脆弱,因为它需要为每个输入文件将整个文件放入内存中。但是,如果您的数据足够小,则可以使用。

【讨论】:

感谢帕特里克,这很有意义。我阅读了一些关于 Hadoop 的 snappy 编解码器的更多信息,它似乎用于在减少所有内容之前从映射器生成的中间文件。在将文本文件推送到 hdfs 存储之前,是否可以使用命令行实用程序使用 hadoop snappy 编解码器压缩我的文本文件?我基本上有大约 10,000 个 5000 万行的文本文件。看起来这可能有效...github.com/kubo/snzip 这个现在已经过时了,python-snappy 支持 hadoop-snappy 虽然不是很清楚。【参考方案2】:

已接受的答案现已过时。你可以使用 python-snappy 来压缩 hadoop-snappy,但是文档几乎没有。 示例:

import snappy
with open('test.json.snappy', 'wb') as out_file:
    data=json.dumps('test':'somevalue','test2':'somevalue2').encode('utf-8')
    compressor = snappy.hadoop_snappy.StreamCompressor()
    compressed = compressor.compress(data)
    out_file.write(compressed)

您也可以使用命令行,其中选项更直接,使用 -t hadoop_snappy 标志。示例:

echo "'test':'somevalue','test2':'somevalue2'" | python -m snappy -t hadoop_snappy -c -test.json.snappy

【讨论】:

【参考方案3】:

好的,我找到了解决方案!

构建这个... https://github.com/liancheng/snappy-utils 在 ubuntu 14.10 上,我必须安装 gcc-4.4 才能构建它对我在这里看到的错误的评论 https://code.google.com/p/hadoop-snappy/issues/detail?id=9

我现在可以像这样在命令行使用 snappy 压缩文本文件

snappy -c gene_regions.vcf -o gene_regions.vcf.snappy

转储到 hdfs

hdfs dfs -put gene_regions.vcf.snappy

然后在 pyspark 中加载它!

a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()

瞧! vcf 的标头...

u'##fileformat=VCFv4.1'

【讨论】:

【参考方案4】:

不确定我的文件有哪些snappy 编解码器,但spark.read.text 对我来说没有任何问题。

【讨论】:

以上是关于pyspark如何加载压缩的snappy文件的主要内容,如果未能解决你的问题,请参考以下文章

如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?

在 pig 中读取 snappy 压缩输入

from pyspark.sql.snappy import SnappyContext - ImportError: No module named snappy

Flink 实战系列Flink 同步 Kafka 数据到 HDFS parquet 格式存储 snappy 压缩

Flink 实战系列Flink 同步 Kafka 数据到 HDFS parquet 格式存储 snappy 压缩

在 Pyspark 中合并 DataFrame