Apache Commons CompressorInputStream 和 HDFS Gzip 文件错误 No Compressor found for the stream signature
Posted
技术标签:
【中文标题】Apache Commons CompressorInputStream 和 HDFS Gzip 文件错误 No Compressor found for the stream signature【英文标题】:Apache Commons CompressorInputStream and HDFS Gzip files error No Compressor found for the stream signature 【发布时间】:2019-04-27 05:12:47 【问题描述】:当编写软件从 HDFS 文件系统读取不同类型的文件时(例如,有些是 tar 存档,有些是纯文本,有些是二进制,有些是 gzip 或其他压缩),我发现我可以使用CompressorStreamFactory.detect 方法可按预期正确识别 gzip 压缩文件,当我尝试创建 BufferedReader 以逐行读取压缩数据时出现错误。
我尝试了以下方法:
val hdfsConf: Configuration = new Configuration()
hdfsConf.set("fs.hdfs.impl",classOf[DistributedFileSystem].getName)
hdfsConf.set("fs.file.impl",classOf[LocalFileSystem].getName)
hdfsConf.set(FileSystem.DEFAULT_FS,"hdfs://a-namenode-that-exists:8020")
val fs: FileSystem = FileSystem.get(new URI("hdfs://a-namenode-that-exists:8020"),hdfsConf)
def getFiles(directory: Path): Unit =
val iter: RemoteIterator[LocatedFileStatus] = fs.listFiles(directory,true)
var c: Int = 0
while(iter.hasNext && c < 3)
val fileStatus: LocatedFileStatus = iter.next()
val path: Path = fileStatus.getPath
val hfs: FileSystem = path.getFileSystem(hdfsConf)
val is: FSDataInputStream = hfs.open(path)
val t: String = CompressorStreamFactory.detect(new BufferedInputStream(is))
System.out.println(s"|||||||||| $t |=| $path.toString")
val reader: BufferedReader = new BufferedReader(new InputStreamReader(new CompressorStreamFactory().createCompressorInputStream(new BufferedInputStream(is))))
var cc: Int = 0
while(cc < 10)
val line: String = reader.readLine()
System.out.println(s"|||||||||| $line")
cc += 1
c += 1
getFiles(new Path("/some/directory/that/is/definitely/there/"))
我希望由于我能够成功使用 CompressorStreamFactory.detect 方法将我的文件正确识别为 gzip,因此读取文件也应该可以工作。 hdfs open 方法返回的结果类是 FSDataInputStream,它派生自 InputStream(和 FilteredInputStream),因此,我希望因为我已经广泛使用 Apache Commons Compress 库从常规 Linux 文件系统读取存档和压缩文件,并且我可以识别正在使用的压缩,它应该也可以在 HDFS 上正常工作......但是,我收到一个错误:
Exception in thread "main" org.apache.commons.compress.compressors.CompressorException: No Compressor found for the stream signature.
at org.apache.commons.compress.compressors.CompressorStreamFactory.detect(CompressorStreamFactory.java:525)
at org.apache.commons.compress.compressors.CompressorStreamFactory.createCompressorInputStream(CompressorStreamFactory.java:542)
at myorg.myproject.scratch.HdfsTest$.getFiles$1(HdfsTest.scala:34)
at myorg.myproject.scratch.HdfsTest$.main(HdfsTest.scala:47)
at myorg.myproject.scratch.HdfsTest.main(HdfsTest.scala)
我非常喜欢 Apache Commons 库,因为它的工厂方法降低了我读取归档和压缩文件(不仅仅是 tar 或 gzip)的代码的复杂性。我希望对检测工作的原因有一个简单的解释,但阅读却没有......但我已经没有如何解决这个问题的想法了。我唯一的想法是 FSDataInputStream 的 FilteredInputStream 起源可能会把事情搞砸......但我不知道如果这是实际问题,如何解决这个问题。
【问题讨论】:
【参考方案1】:在调用CompressorStreamFactory.detect(new BufferedInputStream(is))
后输入流is
已经部分消耗。所以它看起来格式不正确CompressorStreamFactory().createCompressorInputStream
。尝试重新打开文件而不是重复使用 is
。
【讨论】:
感谢您抽出宝贵时间查看此内容。不幸的是,工厂在内部调用了检测方法。我曾尝试做一个标记并重置以查看是否是问题所在,但没有帮助。 我删除了val t: String = CompressorStreamFactory.detect(new BufferedInputStream(is))
行,之后它就起作用了。我的回答清楚吗?以上是关于Apache Commons CompressorInputStream 和 HDFS Gzip 文件错误 No Compressor found for the stream signature的主要内容,如果未能解决你的问题,请参考以下文章
如何在没有 IDE 的情况下使用 Apache Commons Lang 代码? (org.apache.commons.lang3)