Hadoop HDFS:读取正在写入的序列文件
Posted
技术标签:
【中文标题】Hadoop HDFS:读取正在写入的序列文件【英文标题】:Hadoop HDFS: Read sequence files that are being written 【发布时间】:2013-01-17 11:04:10 【问题描述】:我使用的是 Hadoop 1.0.3。
我将日志写入到 HDFS 的 Hadoop 序列文件中,我在每组日志之后调用 syncFS(),但我从不关闭文件(除非我执行每日滚动)。
我要保证的是,在文件仍在写入时,该文件可供读者使用。
我可以通过 FSDataInputStream 读取序列文件的字节,但是如果我尝试使用 SequenceFile.Reader.next(key,val),它会在第一次调用时返回 false。
我知道数据在文件中,因为我可以使用 FSDataInputStream 或 cat 命令读取它,并且我 100% 确定调用了 syncFS()。
我检查了namenode和datanode日志,没有错误或警告。
为什么 SequenceFile.Reader 无法读取我当前正在写入的文件?
【问题讨论】:
【参考方案1】:您无法确保读取完全写入数据节点端的磁盘。您可以在DFSClient#DFSOutputStream.sync()
的文档中看到这一点:
All data is written out to datanodes. It is not guaranteed that data has
been flushed to persistent store on the datanode. Block allocations are
persisted on namenode.
所以它基本上用当前信息更新namenode的块映射并将数据发送到datanode。由于您无法将数据刷新到数据节点上的磁盘,但是您直接从数据节点读取数据,因此您遇到了数据在某处缓冲且无法访问的时间范围。因此,您的序列文件阅读器会认为数据流已完成(或为空)并且无法读取向反序列化过程返回 false 的其他字节。
如果块被完全接收,数据节点将数据写入磁盘(它是预先写入的,但不能从外部读取)。因此,一旦达到您的块大小或您的文件已预先关闭并因此完成一个块,您就可以从文件中读取。这在分布式环境中完全有意义,因为您的编写器可能会死掉并且无法正确完成一个块 - 这是一个一致性问题。
因此解决方法是使块大小非常小,以便更频繁地完成块。但这不是那么有效,我希望您应该清楚您的要求不适合 HDFS。
【讨论】:
我知道您不能保证能够阅读不完整的块。但就我而言,我实际上可以使用简单的 FSInputStream 读取不完整的块数据。是 SequenceFile.Reader 无法做到,因为它以文件长度为边界,文件长度只有在块完成时才会更新。 虽然我通常可以使用直接 FSInputStream 读取一个不完整的块,但在生产服务器中,在一个巨大的文件上,我无法超越最后一个完整的块。【参考方案2】:SequenceFile.Reader 无法读取正在写入的文件的原因是它使用文件长度来执行其魔法。
在写入第一个块时文件长度保持为 0,并且仅在块已满时更新(默认为 64MB)。 然后文件大小卡在 64MB,直到第二个块被完全写入,依此类推...
这意味着您无法使用 SequenceFile.Reader 读取序列文件中最后一个不完整的块,即使可以直接使用 FSInputStream 读取原始数据。
关闭文件也修复了文件长度,但在我的情况下,我需要在关闭文件之前读取文件。
【讨论】:
【参考方案3】:所以我遇到了同样的问题,经过一些调查和时间后,我想出了以下可行的解决方法。
所以问题是由于序列文件创建的内部实现以及它使用的是每块 64 MB 更新的文件长度这一事实。
所以我创建了以下类来创建阅读器,并用我自己的类包装了 hadoop FS,同时我覆盖了 get length 方法以返回文件长度:
public class SequenceFileUtil
public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException
WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));
return new SequenceFile.Reader(fileSystem, path, conf);
private class WrappedFileSystem extends FileSystem
private final FileSystem nestedFs;
public WrappedFileSystem(FileSystem fs)
this.nestedFs = fs;
@Override
public URI getUri()
return nestedFs.getUri();
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException
return nestedFs.open(f,bufferSize);
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException
return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException
return nestedFs.append(f, bufferSize, progress);
@Override
public boolean rename(Path src, Path dst) throws IOException
return nestedFs.rename(src, dst);
@Override
public boolean delete(Path path) throws IOException
return nestedFs.delete(path);
@Override
public boolean delete(Path f, boolean recursive) throws IOException
return nestedFs.delete(f, recursive);
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException
return nestedFs.listStatus(f);
@Override
public void setWorkingDirectory(Path new_dir)
nestedFs.setWorkingDirectory(new_dir);
@Override
public Path getWorkingDirectory()
return nestedFs.getWorkingDirectory();
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException
return nestedFs.mkdirs(f, permission);
@Override
public FileStatus getFileStatus(Path f) throws IOException
return nestedFs.getFileStatus(f);
@Override
public long getLength(Path f) throws IOException
DFSClient.DFSInputStream open = new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
long fileLength = open.getFileLength();
long length = nestedFs.getLength(f);
if (length < fileLength)
//We might have uncompleted blocks
return fileLength;
return length;
【讨论】:
【参考方案4】:我遇到了类似的问题,我是这样解决的: http://mail-archives.apache.org/mod_mbox/hadoop-common-user/201303.mbox/%3CCALtSBbY+LX6fiKutGsybS5oLXxZbVuN0WvW_a5JbExY98hJfig@mail.gmail.com%3E
【讨论】:
我也在我的自定义 PIG 加载器中使用了这个技巧来不忽略未关闭的文件。但这不是 SequenceFile.Reader 的补丁,它仍然失败。我最终使用了基于 Protobuf 的自定义文件格式。 您能否从该链接总结相关解决方案?现在如果它坏了,这个答案就没用了。以上是关于Hadoop HDFS:读取正在写入的序列文件的主要内容,如果未能解决你的问题,请参考以下文章