Hadoop HDFS:读取正在写入的序列文件

Posted

技术标签:

【中文标题】Hadoop HDFS:读取正在写入的序列文件【英文标题】:Hadoop HDFS: Read sequence files that are being written 【发布时间】:2013-01-17 11:04:10 【问题描述】:

我使用的是 Hadoop 1.0.3。

我将日志写入到 HDFS 的 Hadoop 序列文件中,我在每组日志之后调用 syncFS(),但我从不关闭文件(除非我执行每日滚动)。

我要保证的是,在文件仍在写入时,该文件可供读者使用。

我可以通过 FSDataInputStream 读取序列文件的字节,但是如果我尝试使用 SequenceFile.Reader.next(key,val),它会在第一次调用时返回 false。

我知道数据在文件中,因为我可以使用 FSDataInputStream 或 cat 命令读取它,并且我 100% 确定调用了 syncFS()。

我检查了namenode和datanode日志,没有错误或警告。

为什么 SequenceFile.Reader 无法读取我当前正在写入的文件?

【问题讨论】:

【参考方案1】:

您无法确保读取完全写入数据节点端的磁盘。您可以在DFSClient#DFSOutputStream.sync() 的文档中看到这一点:

  All data is written out to datanodes. It is not guaranteed that data has
  been flushed to persistent store on the datanode. Block allocations are
  persisted on namenode.

所以它基本上用当前信息更新namenode的块映射并将数据发送到datanode。由于您无法将数据刷新到数据节点上的磁盘,但是您直接从数据节点读取数据,因此您遇到了数据在某处缓冲且无法访问的时间范围。因此,您的序列文件阅读器会认为数据流已完成(或为空)并且无法读取向反序列化过程返回 false 的其他字节。

如果块被完全接收,数据节点将数据写入磁盘(它是预先写入的,但不能从外部读取)。因此,一旦达到您的块大小或您的文件已预先关闭并因此完成一个块,您就可以从文件中读取。这在分布式环境中完全有意义,因为您的编写器可能会死掉并且无法正确完成一个块 - 这是一个一致性问题。

因此解决方法是使块大小非常小,以便更频繁地完成块。但这不是那么有效,我希望您应该清楚您的要求不适合 HDFS。

【讨论】:

我知道您不能保证能够阅读不完整的块。但就我而言,我实际上可以使用简单的 FSInputStream 读取不完整的块数据。是 SequenceFile.Reader 无法做到,因为它以文件长度为边界,文件长度只有在块完成时才会更新。 虽然我通常可以使用直接 FSInputStream 读取一个不完整的块,但在生产服务器中,在一个巨大的文件上,我无法超越最后一个完整的块。【参考方案2】:

SequenceFile.Reader 无法读取正在写入的文件的原因是它使用文件长度来执行其魔法。

在写入第一个块时文件长度保持为 0,并且仅在块已满时更新(默认为 64MB)。 然后文件大小卡在 64MB,直到第二个块被完全写入,依此类推...

这意味着您无法使用 SequenceFile.Reader 读取序列文件中最后一个不完整的块,即使可以直接使用 FSInputStream 读取原始数据。

关闭文件也修复了文件长度,但在我的情况下,我需要在关闭文件之前读取文件。

【讨论】:

【参考方案3】:

所以我遇到了同样的问题,经过一些调查和时间后,我想出了以下可行的解决方法。

所以问题是由于序列文件创建的内部实现以及它使用的是每块 64 MB 更新的文件长度这一事实。

所以我创建了以下类来创建阅读器,并用我自己的类包装了 hadoop FS,同时我覆盖了 get length 方法以返回文件长度:

public class SequenceFileUtil 

    public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException 

        WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));

        return new SequenceFile.Reader(fileSystem, path, conf);
    

    private class WrappedFileSystem extends FileSystem
    
        private final FileSystem nestedFs;

        public WrappedFileSystem(FileSystem fs)
            this.nestedFs = fs;
        

        @Override
        public URI getUri() 
            return nestedFs.getUri();
        

        @Override
        public FSDataInputStream open(Path f, int bufferSize) throws IOException 
            return nestedFs.open(f,bufferSize);
        

        @Override
        public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException 
            return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);
        

        @Override
        public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException 
            return nestedFs.append(f, bufferSize, progress);
        

        @Override
        public boolean rename(Path src, Path dst) throws IOException 
            return nestedFs.rename(src, dst);
        

        @Override
        public boolean delete(Path path) throws IOException 
            return nestedFs.delete(path);
        

        @Override
        public boolean delete(Path f, boolean recursive) throws IOException 
            return nestedFs.delete(f, recursive);
        

        @Override
        public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException 
            return nestedFs.listStatus(f);
        

        @Override
        public void setWorkingDirectory(Path new_dir) 
            nestedFs.setWorkingDirectory(new_dir);
        

        @Override
        public Path getWorkingDirectory() 
            return nestedFs.getWorkingDirectory();
        

        @Override
        public boolean mkdirs(Path f, FsPermission permission) throws IOException 
            return nestedFs.mkdirs(f, permission);
        

        @Override
        public FileStatus getFileStatus(Path f) throws IOException 
            return nestedFs.getFileStatus(f);
        


        @Override
        public long getLength(Path f) throws IOException 

            DFSClient.DFSInputStream open =  new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
            long fileLength = open.getFileLength();
            long length = nestedFs.getLength(f);

            if (length < fileLength)
                //We might have uncompleted blocks
                return fileLength;
            

            return length;
        


    

【讨论】:

【参考方案4】:

我遇到了类似的问题,我是这样解决的: http://mail-archives.apache.org/mod_mbox/hadoop-common-user/201303.mbox/%3CCALtSBbY+LX6fiKutGsybS5oLXxZbVuN0WvW_a5JbExY98hJfig@mail.gmail.com%3E

【讨论】:

我也在我的自定义 PIG 加载器中使用了这个技巧来不忽略未关闭的文件。但这不是 SequenceFile.Reader 的补丁,它仍然失败。我最终使用了基于 Protobuf 的自定义文件格式。 您能否从该链接总结相关解决方案?现在如果它坏了,这个答案就没用了。

以上是关于Hadoop HDFS:读取正在写入的序列文件的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop爬坑记——HDFS文件因Hadoop版本原因导致的追加问题

HDFS--Hadoop分布式文件系统

反序列化内存中的 Hadoop 序列文件对象

配置 Spark 写入 HDFS 的 Avro 文件大小

2. Hadoop HDFS

在 Spark/Scala 中写入 HDFS,读取 zip 文件